home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Cream of the Crop 26
/
Cream of the Crop 26.iso
/
os2
/
pvm34b3.zip
/
pvm34b3
/
pvm3
/
src
/
pvmd.c
< prev
next >
Wrap
Text File
|
1997-08-08
|
104KB
|
4,718 lines
static char rcsid[] =
"$Id: pvmd.c,v 1.29 1997/07/16 21:10:41 pvmsrc Exp $";
/*
* PVM version 3.4: Parallel Virtual Machine System
* University of Tennessee, Knoxville TN.
* Oak Ridge National Laboratory, Oak Ridge TN.
* Emory University, Atlanta GA.
* Authors: J. J. Dongarra, G. E. Fagg, M. Fischer
* G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
* P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
* (C) 1997 All Rights Reserved
*
* NOTICE
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby granted
* provided that the above copyright notice appear in all copies and
* that both the copyright notice and this permission notice appear in
* supporting documentation.
*
* Neither the Institutions (Emory University, Oak Ridge National
* Laboratory, and University of Tennessee) nor the Authors make any
* representations about the suitability of this software for any
* purpose. This software is provided ``as is'' without express or
* implied warranty.
*
* PVM version 3 was funded in part by the U.S. Department of Energy,
* the National Science Foundation and the State of Tennessee.
*/
/*
* pvmd.c
*
* Mr. pvm daemon.
*
$Log: pvmd.c,v $
* Revision 1.29 1997/07/16 21:10:41 pvmsrc
* WIN32 Fixes - Markus.
*
* Revision 1.28 1997/07/02 20:27:29 pvmsrc
* Fixed startup race on shmem to that a shmem task can get fully
* configured before getting any messages.
* This involved adding two states
* TF_PRESHMCONN and TF_SHM. TF_PRESHMCONN indicates that messages
* with MM_PRIO set can be sent to a task, but regular messages are
* queued. This allows shmem tasks to be completely configured
* before any messages flow. When the daemon changes the state from
* TF_PRESHMCONN to TF_SHMCONN it calls shm_wrt_pkts to write any
* packets that were queued before task state changed to TF_SHMCONN.
*
* Revision 1.27 1997/06/27 21:17:17 pvmsrc
* Using sigset() to force daemon under SUNMP to catch SIGCLD.
*
* Revision 1.26 1997/06/27 19:26:09 pvmsrc
* Integrated WIN32 changes.
*
* Revision 1.25 1997/06/24 20:37:56 pvmsrc
* Installed new env var features:
* - PVM_PATH overrides default execution path (but not hostfile ep=).
* - PVM_WD overrides default working directory (but not hostfile wd=).
*
* Revision 1.24 1997/06/16 13:41:11 pvmsrc
* forkexec() passes extra info to taskers.
*
* Revision 1.23 1997/06/12 20:14:57 pvmsrc
* Added #define for MAXPATHLEN.
* - fix submitted by Brian Forney (bforney@cray.com)
* of Cray Research, Inc.
*
* Revision 1.22 1997/06/02 13:21:22 pvmsrc
* Added TM_SHMCONN test in pkt_to_task so that remote tasks write to local
* tasks via their shared memory instead of the t-d socket.
*
* Revision 1.21 1997/05/29 15:14:34 pvmsrc
* Added IMA_SGIMP64 to the infamous PVM_TIMET #define list. :-)
*
* Revision 1.20 1997/05/27 14:49:25 pvmsrc
* Added test for shmem and TF_SHMCONN to mesg_to_task():
* so that mpp_output is used insted of locloutput when a t-d
* socket exists when tasks are connected via shared memory
* correctly...
*
* Revision 1.19 1997/05/13 13:27:01 pvmsrc
* Fixed up SGI64 crapola - PVM_TIMET -> time_t.
*
* Revision 1.18 1997/05/08 21:06:30 pvmsrc
* defined PVM_TIMET to time_t for LINUX and LINUXSPARC
* and PVM_TIMET to long for all others so
* that calls to ctime don't generate warnings
*
* Revision 1.17 1997/05/07 21:19:19 pvmsrc
* swapped logic around SOCKLENISINT so that it is now
* the default path and SOCKLENISUINT is the compile-time
* selected path.
*
* Now, AIX 4.1 will have to remove SOCKLENISUINT from
* their AIXxxx.def configure files.
*
* Revision 1.16 1997/05/07 18:37:10 pvmsrc
* AIX 3.2 vs 4.1 vs 4.2 problems resolved (I hope)
* Define oslen as size_t with compile flag SOCKLENISINT
* for AIX 4.1 to explicitly set to int.
* To use flag - set in conf/AIX46K.def.
* Tested with cc and gcc on all 3 OS versions.
*
* Revision 1.15 1997/05/05 18:51:16 pvmsrc
* Removed Ifdefs so that select timeout was set properly for PGONs.
*
* Revision 1.14 1997/05/02 13:50:20 pvmsrc
* Support for SP2MPI. call mpp_init() mpp_output()
*
* Revision 1.13 1997/04/30 21:26:24 pvmsrc
* SGI Compiler Warning Cleanup.
*
* Revision 1.12 1997/04/21 14:58:25 pvmsrc
* Changed #ifdefs that checked IMA_RS6K,IMA_SP2MPI & IMA_AIX46K
* to see if select.h was needed into single define NEEDSSELECTH.
* New archs need to set this in conf/
*
* Revision 1.11 1997/04/03 16:13:55 pvmsrc
* 1) Fixed sequence number rollover bug (apparent only when nopax > 1)
* 2) Fixed queueing order of output pkts (opq). (apparent when nopax > 1)
* Packets could be queued in reverse order causing the remote daemon
* to reorder packets even when there were no transmission errors.
*
* Revision 1.10 1997/03/07 14:01:06 pvmsrc
* Support for Mpps.
*
* Revision 1.9 1997/02/13 15:10:02 pvmsrc
* Removed unnecessary extern for struct waitc *waitlist.
* - now in global.h.
*
* Revision 1.8 1997/01/28 19:27:02 pvmsrc
* New Copyright Notice & Authors.
*
* Revision 1.7 1996/11/26 19:18:33 pvmsrc
* Added code to prevent multiple pvmds for a single user on one machine.
* Define OVERLOADHOST if want to enable multiple pvmds.
*
* Revision 1.6 1996/10/25 13:57:59 pvmsrc
* Replaced old #includes for protocol headers:
* - <pvmsdpro.h>, "ddpro.h", "tdpro.h"
* With #include of new combined header:
* - <pvmproto.h>
*
* Revision 1.5 1996/10/24 21:31:36 pvmsrc
* Moved #include "global.h" to below other #includes for typing.
* Added #include <pvmtev.h> (to get TEV_* constants).
* Added decl for storing registered tracer info:
* - struct Pvmtracer pvmtracer.
* - init to { 0, 0, 0, 0, 0, 0, 0, 0, "" }.
* Changed comparisons on tp->t_outtid:
* - check for > 0, not just non-zero.
* - to handle new case where task sets to -1 to deny external collect.
*
* Revision 1.4 1996/10/14 19:17:03 pvmsrc
* Used ARCHFLAG SOCKLENISUINT where socket length is unsigned int
*
* Revision 1.3 1996/10/09 21:48:16 pvmsrc
* Problem:
* --------
* The problem was the result of AIX4.2 changing the expected datatype of
* the XxxxLength parameter in the following system function calls:
*
* int bind (Socket, Name, NameLength)
* int getsockname (Socket, Name, NameLength)
* int recvfrom (Socket, Buffer, Length, Flags, From, FromLength)
* int accept (Socket, Address, AddressLength)
*
* The compiler warning message generated was:
* Function argument assignment between types "unsigned long*" and "int*"
* is not allowed.
*
* Action:
* -------
* In PVM functions where oslen was already declared_
* changed declaration from: int oslen;
* to: unsigned int oslen;
*
* In PVM functions where a local "temporary" variable such as i, j, etc.
* was used_
* added declaration of: unsigned int oslen;
* and then changed to use oslen where necessary.
*
* Revision 1.2 1996/09/23 23:27:26 pvmsrc
* Initial Creation - original pvmd.c.
*
* Revision 1.40 1995/11/02 16:29:24 manchek
* added -t flag for test mode.
* put back save under packet header in netoutput.
* refragment in pkt_to_host now handles message header correctly
*
* Revision 1.39 1995/09/05 19:22:07 manchek
* forgot ifdef for SP2MPI
*
* Revision 1.38 1995/07/28 20:52:01 manchek
* missed changing src to pk_src in loclinpkt
*
* Revision 1.37 1995/07/28 16:40:59 manchek
* wrap HASERRORVARS around errno declarations
*
* Revision 1.36 1995/07/24 19:52:02 manchek
* message header no longer part of packet data, goes in pkt struct.
* socket drivers in {locl,net}{in,out}put must strip and reconstitute headers.
* no longer need to replicate first fragment of message to send,
* or to save-under.
* cleaned up line between loclinput and loclinpkt.
*
* Revision 1.35 1995/07/19 21:26:57 manchek
* use new function pvmnametag instead of [dts]mname
*
* Revision 1.34 1995/07/18 17:02:03 manchek
* added code to generate and check crc on each message (MCHECKSUM)
*
* Revision 1.33 1995/07/11 18:56:00 manchek
* main prints PVMSOCK instead of master_config (after mpp_init)
*
* Revision 1.32 1995/07/05 16:20:39 manchek
* work calls mpp_dredge for zombies if task with zero tid closes socket
* (possibly a shared memory task exiting)
*
* Revision 1.31 1995/07/03 19:16:24 manchek
* removed POWER4 ifdefs and misc. schmutz
*
* Revision 1.30 1995/06/28 15:27:33 manchek
* pvmbailout doesn't set global bailing_out
*
* Revision 1.29 1995/06/16 16:28:35 manchek
* (CSPP) CINDEX macro defined both in pvmd.c and system include file.
* can undef CINDEX before we define it for us
*
* Revision 1.28 1995/06/02 17:51:44 manchek
* added code to balance spawn (forks) on CSPP
*
* Revision 1.27 1995/05/30 17:46:59 manchek
* Added ifdefs for SP2MPI arch
*
* Revision 1.26 1995/05/17 16:31:57 manchek
* changed global mytid to pvmmytid.
* pvmbailout sets global bailing_out (used by shared memory code).
* use PVMDDEBUG envar to set pvmdebmask at startup.
* added new debug classes.
* use FDSETISINT.
* on LINUX systems, check sendto for ENOMEM.
*
* Revision 1.25 1995/02/06 22:40:11 manchek
* shared memory ports call mpp_setmtu before slave_config
*
* Revision 1.24 1995/02/06 18:52:24 manchek
* added debugging prints for when main select in work fails (solaris)
*
* Revision 1.23 1995/02/06 05:01:28 manchek
* hmm
*
* Revision 1.22 1995/02/03 16:45:27 manchek
* touch up reap - define rus as int if we don't think struct rusage exists
*
* Revision 1.21 1995/02/01 21:31:23 manchek
* added clear_opq_of, called when host is deleted from table or pvmd' exits
*
* Revision 1.20 1994/12/20 16:40:35 manchek
* use O_NONBLOCK for RS6K
*
* Revision 1.19 1994/11/08 19:05:07 manchek
* mpp fix?
*
* Revision 1.18 1994/11/08 15:30:51 manchek
* shared memory cleanup
*
* Revision 1.17 1994/10/15 19:27:02 manchek
* make wrk_fds_init(), use instead of FD_ZERO.
* don't send FIN|ACK to ourself in bailout.
* don't clean up task until SIGCHLD if TF_FORKD set.
* check newhosts when deleting host.
* cast message tags for comparison as integers.
* in beprime() call task_init instead of trying to clean up
*
* Revision 1.16 1994/09/02 15:48:17 manchek
* added UXPM ifdef to parallel SUN4SOL2
*
* Revision 1.15 1994/09/02 15:27:55 manchek
* forgot to inc refcount of nth fragment in sendmessage
*
* Revision 1.14 1994/07/18 19:21:51 manchek
* added PDMWAITC.
* fix to call write() with max 4096 length for RS6K
*
* Revision 1.13 1994/06/30 21:36:56 manchek
* don't check remote sockaddr in netinput() on LINUX
*
* Revision 1.12 1994/06/04 21:45:10 manchek
* added unix domain sockets
*
* Revision 1.11 1994/06/03 20:38:22 manchek
* version 3.3.0
*
* Revision 1.10 1993/12/20 15:39:28 manchek
* patch 6 from wcj
*
* Revision 1.9 1993/10/25 20:51:11 manchek
* make sure pvmd doesn't use 0..2 for sockets, etc. - open /dev/null.
* added code to change process group/disassoc. from tty (TTYDIS).
* ping other pvmds also when run state is PVMDHTUPD
*
* Revision 1.8 1993/10/12 14:18:37 manchek
* fixed bug in locloutput() - hung if write() returned 0
*
* Revision 1.7 1993/10/04 20:27:42 manchek
* renamed useruid to pvm_useruid for compat with libpvm
*
* Revision 1.6 1993/10/04 19:17:45 manchek
* on Solaris, sendto() can return ECHILD. Hahahahahaha!!!
*
* Revision 1.5 1993/10/04 19:12:25 manchek
* hd_txseq wasn't wrapped properly with NEXTSEQNUM
*
* Revision 1.4 1993/09/23 20:36:19 manchek
* fixed broken mca lookup
*
* Revision 1.3 1993/09/22 19:14:47 manchek
* added network resend statistic.
* removed redundant code in netinpkt() where it finds mca
*
* Revision 1.2 1993/09/16 21:45:32 manchek
* replaced reap() - now uses SYSVSIGNAL and NOWAIT3 macros
*
* Revision 1.1 1993/08/30 23:26:50 manchek
* Initial revision
*
*/
#ifndef WIN32
#include <sys/param.h>
#endif
#ifdef NEEDMENDIAN
#include <machine/endian.h>
#endif
#ifdef NEEDENDIAN
#include <endian.h>
#endif
#ifdef NEEDSENDIAN
#include <sys/endian.h>
#endif
#ifndef WIN32
#include <rpc/types.h>
#include <rpc/xdr.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/socket.h>
#include <sys/signal.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <pwd.h>
#else
#include "pvmwin.h"
#include "..\xdr\types.h"
#include "..\xdr\xdr.h"
#include <time.h>
#endif
#ifdef IMA_TITN
#include <bsd/sys/types.h>
#else
#include <sys/types.h>
#ifndef WIN32
#include <sys/ioctl.h>
#endif
#endif
#ifndef NOWAIT3
#include <sys/resource.h>
#endif
#ifdef NEEDSSELECTH
#include <sys/select.h>
#endif
#include <sys/stat.h>
#ifndef NOUNIXDOM
#include <sys/un.h>
#endif
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <ctype.h>
/* Must come before local CINDEX macro definition */
#if defined(IMA_CSPP) && defined(BALANCED_SPAWN)
#include <sys/cnx_types.h>
#include <sys/cnx_sysinfo.h>
#include <sys/cnx_pattr.h>
#undef CINDEX
#endif
#ifdef SYSVSTR
#include <string.h>
#define CINDEX(s,c) strchr(s,c)
#else
#include <strings.h>
#define CINDEX(s,c) index(s,c)
#endif
#include <pvm3.h>
#include <pvmproto.h>
#include "pvmalloc.h"
#include "host.h"
#include "pvmdabuf.h"
#include "pvmfrag.h"
#include "pmsg.h"
#include "pkt.h"
#include "task.h"
#include "waitc.h"
#include "listmac.h"
#include "tvdefs.h"
#if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_CM5) || defined(SHMEM) || defined(IMA_SP2MPI)
#include "pvmdmp.h"
#endif
#include "bfunc.h"
#include <pvmtev.h>
#include "global.h"
#ifdef IMA_CRAY
#define MAXPATHLEN PATH_MAX+1
#endif
#ifndef max
#define max(a,b) ((a)>(b)?(a):(b))
#endif
#ifndef min
#define min(a,b) ((a)<(b)?(a):(b))
#endif
#ifndef TTYDIS
#define TTYDIS 0
#endif
#ifndef SOMAXCONN
#define SOMAXCONN 5
#endif
#ifdef NOTMPNAM
#define TMPNAMFUN(x) pvmtmpnam(x)
#define LEN_OF_TMP_NAM 64
char *pvmtmpnam();
#else /*NOTMPNAM*/
#define TMPNAMFUN(x) tmpnam(x)
#ifdef L_tmpnam
#define LEN_OF_TMP_NAM L_tmpnam
#else
#define LEN_OF_TMP_NAM 64
#endif
#endif /*NOTMPNAM*/
#ifdef STATISTICS
struct statistics {
int selneg, selzer, selrdy; /* neg, zero, ready selects */
int rdneg, rdzer, rdok; /* neg, zero, positive reads */
int wrneg, wrzer, wrshr, wrok; /* neg, zero, short, ok writes */
int sdneg, sdok; /* neg, ok sendtos */
int rfok; /* ok recvfroms */
int refrag; /* refragmented frags */
int netret; /* network resends */
};
struct statistics stats;
#endif
struct deaddata {
int dd_pid; /* process id */
int dd_es; /* unix exit status */
struct timeval dd_ut; /* user time used */
struct timeval dd_st; /* system time used */
};
void catch();
char *debug_flags();
char *pvmnametag();
void biteme();
void evilsig();
char *getenv();
char *inadport_decimal();
char *inadport_hex();
void pvmbailout();
char *pvmdsockfile();
char *pvmgethome();
char *pvmgetroot();
void reap();
void i_dump();
void hex_inadport __ProtoGlarp__ (( char *, struct sockaddr_in * ));
void mesg_rewind __ProtoGlarp__ (( struct pmsg * ));
#if defined(IMA_LINUX) || defined(IMA_LINUXSPARC) || defined(IMA_SGI64) || defined(IMA_SGIMP64)
#define PVM_TIMET time_t
#else
#define PVM_TIMET long
#endif
#if defined(IMA_PGON) && !defined(IMA_MPP)
#define IMA_MPP
#endif
/***************
** Globals **
** **
***************/
#ifndef HASERRORVARS
extern int errno;
#endif
extern struct task *locltasks; /* from task.c */
extern int log_fd; /* from logging.c */
char **epaths = 0; /* exec search path */
char *debugger = 0; /* debugger executable */
int pvmdebmask = 0; /* which debugging info */
struct htab *filehosts = 0; /* advisory host table from hostfile */
struct htab *hosts = 0; /* active host table */
int hostertid = 0; /* slave pvmd starter task */
char *loclsnam = 0; /* t-d socket or addr file path */
int loclsock = -1; /* pvmd-task master tcp socket */
#ifndef NOUNIXDOM
char *loclspath = 0; /* t-d socket path */
#endif
char *myarchname = ARCHCLASS;
int myhostpart = 0; /* host number shifted to tid host field */
int pvmfrgsiz = UDPMAXLEN; /* message frag length (to pack) */
int pvmmydsig = 0; /* host native data enc */
int pvmmytid = 0; /* this pvmd tid */
int pvmmyupid = -1; /* pvmd pid */
int netsock = -1; /* host-host udp socket */
int nopax = 1; /* number of outstanding pkts on d-d link */
struct htab *newhosts = 0; /* hosts being added by pvmd' */
struct htab *oldhosts = 0; /* real host table (for pvmd') */
int pvmudpmtu = UDPMAXLEN; /* local UDP MTU */
int ppnetsock = -1; /* pvmd' host-host udp socket */
int pprime = 0; /* pvmd' pid for when we're forked */
int runstate = 0; /* pvmd run state */
int pvmschedtid = 0; /* scheduler task id */
int taskertid = 0; /* task starter task */
struct Pvmtracer pvmtracer = /* task tracer info struct */
{ 0, 0, 0, 0, 0, 0, 0, 0, "" };
int tidhmask = TIDHOST; /* mask for host field of tids */
int tidlmask = TIDLOCAL; /* mask for local field of tids */
int pvm_useruid = -1; /* our uid */
char *username = 0; /* our loginname */
#ifdef WIN32
/* the userid is not available in WIN 32! */
/* some useless binary sid struct */
char *pvmtmpspec=0;
int system_loser_win=FALSE;
int nAlert=SO_SYNCHRONOUS_NONALERT;
int nFileHandle;
WSADATA WSAData;
LPOSVERSIONINFO osinfo=0;
BOOL os_is_NT=FALSE;
#endif
/***************
** Private **
** **
***************/
static int totprobes = 0;
static int altprobe = 100; /* ratio of mpp_input() to select for pgons */
static struct deaddata *deads = 0; /* circ queue of dead task data */
static int ndead = 0; /* len of deads */
static struct pkt *opq = 0; /* outstanding pkt q to all hosts */
static int rdead = 0; /* read ptr for deads */
static int slavemode = 0; /* started by master pvmd */
static struct pmsg *addmesg = 0; /* message to self to add slaves */
static int wdead = 0; /* write ptr for deads */
static fd_set wrk_rfds; /* fd_sets for select() in work() */
static fd_set wrk_wfds;
/*
static fd_set wrk_efds;
*/
static int wrk_nfds = 0; /* 1 + highest bit set in fds */
main(argc, argv)
int argc;
char **argv;
{
int i, j;
char *name = "";
struct passwd *pe;
int testmode = 0;
struct timeval tnow;
char buf[128];
#ifndef WIN32
/* make sure 0, 1, 2 are in use */
(void)open("/dev/null", O_RDONLY, 0);
(void)open("/dev/null", O_RDONLY, 0);
(void)open("/dev/null", O_RDONLY, 0);
#else
/* WSAStartup has to be called before any socket command */
/* can be executed. Why ? Ask Bill */
if (WSAStartup(0x0101, &WSAData) != 0) {
printf("\nWSAStartup() failed\n");
ExitProcess(1);
}
setsockopt(INVALID_SOCKET,SOL_SOCKET,SO_OPENTYPE,
(char *)&nAlert,sizeof(int));
osinfo=malloc(sizeof(LPOSVERSIONINFO));
osinfo->dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
GetVersionEx(osinfo);
os_is_NT= ( osinfo->dwPlatformId == VER_PLATFORM_WIN32_NT);
#endif
{
char *p;
if (p = getenv("PVMDDEBUG"))
pvmdebmask = pvmstrtoi(p);
}
#ifndef WIN32
if ((pvm_useruid = getuid()) == -1) {
pvmlogerror("main() can't getuid()\n");
pvmbailout(0);
}
#else
username = MyGetUserName();
#endif
pvmsetlog(3);
#ifndef WIN32
if (pe = getpwuid(pvm_useruid))
username = STRALLOC(pe->pw_name);
else
pvmlogerror("main() can't getpwuid\n");
endpwent();
#endif
#ifdef WIN32
if ((pvmmyupid = _getpid()) == -1) {
pvmlogerror("main() can't getpid() %d \n",GetLastError());
pvmbailout(0);
}
#else
if ((pvmmyupid = getpid()) == -1) {
pvmlogerror("main() can't getpid()\n");
pvmbailout(0);
}
#endif
(void)pvmgetroot(); /* fail here if we can't */
sprintf(buf, "PVM_ARCH=%s", myarchname);
pvmputenv(STRALLOC(buf));
pvmmydsig = pvmgetdsig();
ppi_config(&argc, argv);
#if defined(IMA_PGON) || defined(IMA_SP2MPI)
mpp_init(&argc, argv);
#endif
for (i = j = 1; i < argc; i++) {
if (argv[i][0] == '-') {
switch (argv[i][1]) {
case 'd':
pvmdebmask = pvmstrtoi(argv[i] + 2);
break;
case 'n':
name = argv[i] + 2;
break;
case 'S':
argv[j++] = argv[i];
case 's':
slavemode = 1;
break;
case 't':
testmode = 1;
break;
#ifdef WIN32
case 'u':
if (os_is_NT==FALSE) {
/* someone is on win95 ... */
argv[i]++;argv[i]++;
strcpy(username,argv[i]);
}
else
if (strcmp(strcat("-u",username),argv[i])) {
fprintf(stderr,
"Provided Username(%s) does not match ",
username);
fprintf(stderr,"with OS account(%s).\n",
argv[i]);
fprintf(stderr,"Wrong rshd ?!\n");
exit(1);
}
break;
#endif
default:
argv[j++] = argv[i];
}
} else {
argv[j++] = argv[i];
}
}
argc = j;
if (pvmdebmask) {
pvmlogprintf("version %s\n", PVM_VER);
pvmlogprintf("ddpro %d tdpro %d\n", DDPROTOCOL, TDPROTOCOL);
pvmlogprintf("main() debug mask is 0x%x (%s)\n",
pvmdebmask, debug_flags(pvmdebmask));
}
if (!*name) {
if (gethostname(buf, sizeof(buf)-1) == -1) {
pvmlogerror("main() can't gethostname()\n");
pvmbailout(0);
}
name = buf;
}
if (testmode) {
gettimeofday(&tnow, (struct timezone*)0);
pvmlogprintf("version %s ddpro %d tdpro %d sdpro %d\n",
PVM_VER, DDPROTOCOL, TDPROTOCOL, SDPROTOCOL);
pvmlogprintf(ctime((PVM_TIMET *) &tnow.tv_sec));
for (i = 0; i < argc; i++)
pvmlogprintf("argv[%d]=\"%s\"\n", i, argv[i]);
exit(0);
}
if (slavemode) /* slave pvmd */
slave_config(name, argc, argv);
else /* master pvmd */
master_config(name, argc, argv);
#if TTYDIS & 8
setsid();
#endif
#if TTYDIS & 4
setpgid(0, 0);
#endif
#if TTYDIS & 2
setpgrp(0, 0);
#endif
#if TTYDIS & 1
if ((i = open("/dev/tty", O_RDWR, 0)) != -1) {
(void)ioctl(i, TIOCNOTTY, 0);
(void)close(i);
}
#endif
myhostpart = hosts->ht_local << (ffs(tidhmask) - 1);
pvmmytid = myhostpart | TIDPVMD;
ndead = 1000; /* XXX hum, static limit makes this easy to do */
/* deads = TALLOC(ndead, int, "pids"); */
deads = TALLOC(ndead, struct deaddata, "dead");
BZERO((char*)deads, ndead * sizeof(struct deaddata));
#ifndef WIN32
/* no signaling in win32 because no parent child relation ... :-( */
#ifndef IMA_I860 /* this signal interferes with getcube() on I860 */
#ifdef SYSVSIGNAL
(void)signal(SIGCLD, reap);
#ifdef IMA_SUNMP
sigset(SIGCLD, reap); /* yep we go really want to catch our kids */
#endif
#else
(void)signal(SIGCHLD, reap);
#endif /* SYSVSIGNAL */
#endif /*IMA_I860*/
#endif
if (signal(SIGINT, SIG_IGN) != SIG_IGN)
#ifdef WIN32
(void)signal(SIGINT, (void*) catch);
#else
(void)signal(SIGINT, catch);
#endif
if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
#ifdef WIN32
(void)signal(SIGTERM, (void *) catch);
#else
(void)signal(SIGTERM, catch);
#endif
#ifndef WIN32
(void)signal(SIGHUP, SIG_IGN);
(void)signal(SIGPIPE, SIG_IGN);
#endif
#ifndef WIN32
(void)signal(SIGFPE, evilsig);
(void)signal(SIGILL, evilsig);
#else
(void)signal(SIGFPE, (void *)evilsig);
(void)signal(SIGILL, (void *)evilsig);
#endif
#ifdef SIGBUS
(void)signal(SIGBUS, evilsig);
#endif
#ifndef WIN32
(void)signal(SIGSEGV, evilsig);
#else
(void)signal(SIGSEGV, (void *)evilsig);
#endif
#ifdef SIGSYS
(void)signal(SIGSYS, evilsig);
#endif
#ifdef SIGDANGER
(void)signal(SIGDANGER, biteme);
#endif
#ifdef STATISTICS
reset_statistics();
#endif
task_init();
wait_init(myhostpart, TIDLOCAL);
mb_init();
ppi_init();
opq = pk_new(0);
opq->pk_tlink = opq->pk_trlink = opq;
/* print local socket address on stdout in case someone cares */
if (!slavemode) {
printf("%s\n", getenv("PVMSOCK"));
fflush(stdout);
}
/* XXX hack to start slaves automatically */
if (!slavemode && filehosts) {
struct hostd *hp;
int hh;
int n = 0;
for (hh = filehosts->ht_last; hh >= 1; hh--)
if ((hp = filehosts->ht_hosts[hh]) && !(hp->hd_flag & HF_NOSTART))
n++;
if (n) {
addmesg = mesg_new(0);
addmesg->m_tag = DM_ADD;
pkint(addmesg, n);
for (hh = 1; hh <= filehosts->ht_last; hh++)
if ((hp = filehosts->ht_hosts[hh]) && !(hp->hd_flag & HF_NOSTART))
pkstr(addmesg, hp->hd_name);
addmesg->m_dst = TIDPVMD;
}
}
work();
pvmbailout(0); /* not reached */
exit(0);
}
static char *ffnames[] = {
"SOM", "EOM", "DAT", "FIN", "ACK"
};
char *
pkt_flags(ff)
int ff;
{
static char buf[64];
int bit, i;
buf[0] = 0;
for (bit = 1, i = 0; i < sizeof(ffnames)/sizeof(ffnames[0]); i++, bit *= 2)
if (ff & bit) {
if (buf[0])
strcat(buf, ",");
strcat(buf, ffnames[i]);
}
if (!buf[0])
strcpy(buf, "0");
return buf;
}
void
evilsig(sig)
int sig;
{
if (runstate == PVMDISTASK)
exit(sig);
(void)signal(SIGILL, SIG_DFL);
(void)signal(SIGFPE, SIG_DFL);
#ifdef SIGBUS
(void)signal(SIGBUS, SIG_DFL);
#endif
(void)signal(SIGSEGV, SIG_DFL);
#ifdef SIGSYS
(void)signal(SIGSYS, SIG_DFL);
#endif
(void)signal(SIGINT, SIG_DFL);
(void)signal(SIGTERM, SIG_DFL);
#ifndef WIN32
(void)signal(SIGHUP, SIG_DFL);
(void)signal(SIGPIPE, SIG_DFL);
#endif
#ifdef SYSVSIGNAL
(void)signal(SIGCLD, SIG_DFL);
#else
#ifndef WIN32
(void)signal(SIGCHLD, SIG_DFL);
#endif
#endif
pvmlogprintf("evilsig() caught signal %d\n", sig);
i_dump(1);
/*
abort();
*/
pvmbailout(-sig);
}
void
catch(sig)
int sig;
{
if (runstate == PVMDISTASK)
exit(sig);
(void)signal(SIGINT, SIG_DFL);
(void)signal(SIGTERM, SIG_DFL);
pvmlogprintf("catch() caught signal %d\n", sig);
pvmbailout(sig);
}
#ifdef SIGDANGER
void
biteme(sig)
int sig;
{
pvmlogprintf("biteme() caught signal %d and spaced it.\n", sig);
pvmlogerror("the mad fools, when will they learn?\n");
#ifdef SYSVSIGNAL
(void)signal(SIGDANGER, biteme);
#endif
}
#endif /*SIGDANGER*/
/* reap()
*
* Child process has exited. Put its pid in the fifo of tasks
* to be cleaned up (in the work loop).
*/
void
reap(sig)
int sig;
{
int pid;
int es = 0;
#ifndef WIN32
#ifndef NOWAIT3
#if defined(RUSAGE_SELF)
struct rusage rus;
#else
int rus;
#endif
#endif
sig = sig;
#ifdef NOWAIT3
#ifdef NOWAITPID
if ((pid = wait(&es)) > 0)
#else
while ((pid = waitpid(-1, &es, WNOHANG)) > 0)
#endif
#else /*NOWAIT3*/
while ((pid = wait3(&es, WNOHANG, &rus)) > 0)
#endif /*NOWAIT3*/
{
#if !defined(NOWAIT3) && defined(RUSAGE_SELF)
deads[wdead].dd_ut = rus.ru_utime;
deads[wdead].dd_st = rus.ru_stime;
#else
deads[wdead].dd_ut.tv_sec = 0;
deads[wdead].dd_ut.tv_usec = 0;
deads[wdead].dd_st.tv_sec = 0;
deads[wdead].dd_st.tv_usec = 0;
#endif
deads[wdead].dd_pid = pid;
deads[wdead].dd_es = es;
if (++wdead >= ndead)
wdead = 0;
}
#ifdef SYSVSIGNAL
(void)signal(SIGCLD, reap);
#endif
#endif
}
/* pvmbailout()
*
* We're hosed. Clean up as much as possible and exit.
*/
void
pvmbailout(n)
int n;
{
struct task *tp;
pvmlogprintf("pvmbailout(%d)\n", n);
/* sockaddr file */
if (loclsnam)
(void)unlink(loclsnam);
/* kill local tasks */
#ifdef SHMEM
mpp_cleanup();
#endif
if (locltasks)
for (tp = locltasks->t_link; tp != locltasks; tp = tp->t_link) {
if (tp->t_pid)
#ifndef WIN32
#ifdef IMA_OS2
(void)os2_kill(tp->t_pid, SIGTERM);
#else
(void)kill(tp->t_pid, SIGTERM);
#endif
#else
(void)kill(tp->t_pid,tp->t_handle,SIGTERM);
#endif
if (tp->t_authnam)
(void)unlink(tp->t_authnam);
}
/* shutdown slave pvmds / notify master */
if (netsock != -1) {
char dummy[DDFRAGHDR];
int hh;
struct hostd *hp;
if (pvmdebmask)
pvmlogerror("sending FIN|ACK to all pvmds\n");
for (hh = hosts->ht_last; hh >= 1; hh--)
if ((hp = hosts->ht_hosts[hh]) && hp->hd_hostpart != myhostpart) {
pvmput32(dummy, hp->hd_hostpart | TIDPVMD);
pvmput32(dummy + 4, myhostpart | TIDPVMD);
pvmput16(dummy + 8, 0);
pvmput16(dummy + 10, 0);
pvmput8(dummy + 12, FFFIN|FFACK);
sendto(netsock, dummy, DDFRAGHDR, 0,
(struct sockaddr*)&hp->hd_sad, sizeof(hp->hd_sad));
}
}
#ifndef NOUNIXDOM
if (loclspath)
(void)unlink(loclspath);
#endif
if (n < 0)
abort();
exit(n);
}
void
wrk_fds_init()
{
wrk_nfds = 0;
FD_ZERO(&wrk_rfds);
FD_ZERO(&wrk_wfds);
/*
FD_ZERO(&wrk_efds);
*/
}
wrk_fds_add(fd, sets)
int fd; /* the fd */
int sets; /* which sets */
{
#ifdef SANITY
if (fd < 0 || fd >= FD_SETSIZE) {
pvmlogprintf("wrk_fds_add() bad fd %d\n", fd);
return 1;
}
#endif
if (sets & 1)
FD_SET(fd, &wrk_rfds);
if (sets & 2)
FD_SET(fd, &wrk_wfds);
/*
if (sets & 4)
FD_SET(fd, &wrk_efds);
*/
/* if this is new highest, adjust nfds */
if (fd >= wrk_nfds)
wrk_nfds = fd + 1;
return 0;
}
wrk_fds_delete(fd, sets)
int fd; /* the fd */
int sets; /* which sets */
{
#ifdef SANITY
if (fd < 0 || fd >= FD_SETSIZE) {
pvmlogprintf("wrk_fds_delete() bad fd %d\n", fd);
return 1;
}
#endif
if (sets & 1)
FD_CLR(fd, &wrk_rfds);
if (sets & 2)
FD_CLR(fd, &wrk_wfds);
/*
if (sets & 4)
FD_CLR(fd, &wrk_efds);
*/
/* if this was highest, may have to adjust nfds to new highest */
if (fd + 1 == wrk_nfds)
while (wrk_nfds > 0) {
wrk_nfds--;
if (FD_ISSET(wrk_nfds, &wrk_rfds)
|| FD_ISSET(wrk_nfds, &wrk_wfds)
/*
|| FD_ISSET(wrk_nfds, &wrk_efds)
*/
) {
wrk_nfds++;
break;
}
}
return 0;
}
/* clear_opq_of()
*
* Clear packets dst for host in opq but _not_ in host hd_opq.
*/
int
clear_opq_of(tid)
int tid; /* host */
{
struct pkt *pp, *pp2;
for (pp = opq->pk_tlink; pp != opq; pp = pp->pk_tlink) {
if (pp->pk_dst == tid && !pp->pk_link) {
pp2 = pp->pk_trlink;
LISTDELETE(pp, pk_tlink, pk_trlink);
pk_free(pp);
pp = pp2;
}
}
return 0;
}
/* work()
*
* The whole sausage
*/
work()
{
static int lastpinged = 0; /* host that got last keepalive message */
fd_set rfds, wfds; /* result of select */
/*
fd_set efds;
*/
int nrdy; /* number of fds ready after select */
struct timeval tbail; /* time to bail if state = STARTUP */
struct timeval tping; /* time to send next keepalive packet */
struct timeval tnow;
struct timeval tout;
struct pmsg *mp;
struct task *tp;
struct hostd *hp;
#if defined(IMA_PGON) || defined(IMA_I860) || defined(SHMEM)
int nodemsg = 0;
#if defined(IMA_PGON)
struct timeval tpgon; /* time to spend in paragon select */
double tbpl; /* time at beginning of probe loop */
double toutpl; /* timeout in the probe loop */
int timed_out;
extern double dclock();
#endif
#endif
#ifdef SHMEM
int someclosed;
#endif
gettimeofday(&tnow, (struct timezone*)0);
if (pvmdebmask || myhostpart) {
pvmlogprintf("%s (%s) %s %s\n",
hosts->ht_hosts[hosts->ht_local]->hd_name,
inadport_decimal(&hosts->ht_hosts[hosts->ht_local]->hd_sad),
myarchname,
PVM_VER);
pvmlogprintf("ready ");
pvmlogprintf(ctime((PVM_TIMET *) &tnow.tv_sec));
}
/*
* remind myself to start those pesky slave pvmds
*/
if (addmesg) {
struct pmsg *mp = addmesg;
addmesg = 0;
sendmessage(mp);
}
/*
* init bail (for PVMDSTARTUP) and ping times
*/
pvmgetclock(&tnow);
tout.tv_sec = DDBAILTIME;
tout.tv_usec = 0;
TVXADDY(&tbail, &tnow, &tout);
tout.tv_sec = DDPINGTIME;
tout.tv_usec = 0;
TVXADDY(&tping, &tnow, &tout);
/* init select fd sets */
wrk_fds_init();
if (loclsock >= 0)
wrk_fds_add(loclsock, 1);
wrk_fds_add(netsock, 1);
for (; ; ) {
/*
* clean up after any tasks that we got SIGCHLDs for
*/
while (rdead != wdead) {
if (deads[rdead].dd_pid == pprime) {
int cc;
#ifdef SOCKLENISUINT
size_t oslen;
#else
int oslen;
#endif
struct sockaddr_in osad;
struct timeval t;
char buf[DDFRAGHDR];
hostfailentry(hosts->ht_hosts[0]);
clear_opq_of((int)(TIDPVMD | hosts->ht_hosts[0]->hd_hostpart));
pprime = 0;
while (1) {
FD_ZERO(&rfds);
FD_SET(ppnetsock, &rfds);
t.tv_sec = 0;
t.tv_usec = 0;
cc = select(ppnetsock + 1,
#ifdef FDSETISINT
(int *)&rfds, (int *)0, (int *)0,
#else
&rfds, (fd_set *)0, (fd_set *)0,
#endif
&t);
if (cc == 1) {
oslen = sizeof(osad);
recvfrom(ppnetsock, buf, sizeof(buf),
0, (struct sockaddr*)&osad, &oslen);
} else if (cc != -1 || errno != EINTR)
break;
}
} else {
if (tp = task_findpid(deads[rdead].dd_pid)) {
/* check for output one last time
XXX this could be cleaner by going through main select again
XXX before flushing the task */
tp->t_status = deads[rdead].dd_es;
tp->t_utime = deads[rdead].dd_ut;
tp->t_stime = deads[rdead].dd_st;
while (tp->t_out >= 0) {
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(tp->t_out, &rfds);
TVCLEAR(&tout);
if (select(tp->t_out + 1,
#ifdef FDSETISINT
(int *)&rfds, (int *)0, (int *)0,
#else
&rfds, (fd_set *)0, (fd_set *)0,
#endif
&tout) == 1)
loclstout(tp);
else
break;
}
#if defined(IMA_PGON) || defined(IMA_SP2MPI)
mpp_free(tp);
#endif
task_cleanup(tp);
task_free(tp);
}
}
if (++rdead >= ndead)
rdead = 0;
}
netoutput();
if (runstate == PVMDHALTING) {
pvmlogerror("work() pvmd halting\n");
pvmbailout(0);
}
/* bail if new slave and haven't been configured for too long */
pvmgetclock(&tnow);
if (runstate == PVMDSTARTUP && TVXLTY(&tbail, &tnow)) {
pvmlogerror("work() run = STARTUP, timed out waiting for master\n");
pvmbailout(0);
}
/*
* send keepalive message to remote pvmd once in a while
*/
if (TVXLTY(&tping, &tnow)) {
if (pvmdebmask & (PDMPACKET|PDMSELECT))
pvmlogerror("work() ping timer\n");
if (runstate == PVMDNORMAL || runstate == PVMDHTUPD) {
do {
if (++lastpinged > hosts->ht_last)
lastpinged = 1;
} while (!(hp = hosts->ht_hosts[lastpinged]));
if (hp->hd_hostpart != myhostpart
&& hp->hd_txq->pk_link == hp->hd_txq) {
mp = mesg_new(0);
mp->m_tag = DM_NULL;
mp->m_dst = hp->hd_hostpart | TIDPVMD;
sendmessage(mp);
}
}
tout.tv_sec = DDPINGTIME;
tout.tv_usec = 0;
TVXADDY(&tping, &tnow, &tout);
}
/*
* figure select timeout
*/
if (opq->pk_tlink == opq)
tout = tping;
else
tout = opq->pk_tlink->pk_rtv;
if (TVXLTY(&tout, &tnow)) {
TVCLEAR(&tout);
} else {
TVXSUBY(&tout, &tout, &tnow);
}
if (pvmdebmask & PDMSELECT) {
pvmlogprintf("work() select tout is %d.%06d\n",
tout.tv_sec, tout.tv_usec);
}
#ifdef SHMEM
if ((nodemsg = mpp_probe()) == 1) {
mpp_input();
TVCLEAR(&tout);
}
#endif
rfds = wrk_rfds;
wfds = wrk_wfds;
/*
efds = wrk_efds;
*/
if (pvmdebmask & PDMSELECT) {
pvmlogprintf("work() wrk_nfds=%d\n", wrk_nfds);
print_fdset("work() rfds=", wrk_nfds, &rfds);
print_fdset("work() wfds=", wrk_nfds, &wfds);
}
#if !defined(IMA_PGON) && !defined(IMA_I860)
if ((nrdy = select(wrk_nfds,
#ifdef FDSETISINT
(int *)&rfds, (int *)&wfds, (int *)0,
#else
&rfds, &wfds, (fd_set *)0,
#endif
&tout)) == -1) {
if (errno != EINTR) {
pvmlogperror("work() select");
pvmlogprintf(" wrk_nfds=%d\n", wrk_nfds);
print_fdset(" rfds=", wrk_nfds, &wrk_rfds);
print_fdset(" wfds=", wrk_nfds, &wrk_wfds);
pvmlogprintf(" netsock=%d, ppnetsock=%d, loclsock=%d\n",
netsock, ppnetsock, loclsock);
task_dump();
pvmbailout(0);
}
}
#else /*IMA_PGON/IMA_I860*/
timed_out = 0;
toutpl = (double)tout.tv_sec + 1e-6*((double)tout.tv_usec);
tbpl = dclock(); /* use the pgon hw clock */
if (pvmdebmask & PDMSELECT) {
pvmlogprintf(
"work() probe loop timeout is %f dclock is %f\n",
toutpl,tbpl);
}
do {
totprobes++;
if ((nodemsg = mpp_input()) > 1)
{
mpp_input();
TVCLEAR(&tpgon);
} else {
tout.tv_sec = 0;
tout.tv_usec = TIMEOUT;
}
rfds = wrk_rfds;
wfds = wrk_wfds;
nrdy = 0;
if (totprobes % altprobe == 0)
{
totprobes = 0;
if ((nrdy = select(wrk_nfds,
#ifdef FDSETISINT
(int *)&rfds, (int *)&wfds, (int *)0,
#else
&rfds, &wfds, (fd_set *)0,
#endif
&tout))
== -1) {
if (errno != EINTR) {
pvmlogperror("work() select");
pvmbailout(0);
}
}
if ( (dclock() - tbpl ) >= toutpl)
timed_out = 1;
else
timed_out = 0;
if (timed_out && pvmdebmask & PDMSELECT)
{
pvmlogprintf(
"work() probe loop timeout, dclock %f\n",
dclock());
}
}
/* Try to send packets that are still on the mpp output q */
mpp_output( (struct task *) NULL, (struct pkt *) NULL);
} while(!(nrdy || nodemsg || timed_out));
#endif /*IMA_PGON/IMA_I860*/
#ifdef STATISTICS
switch (nrdy) {
case -1:
stats.selneg++;
break;
case 0:
stats.selzer++;
break;
default:
stats.selrdy++;
break;
}
#endif
if (pvmdebmask & PDMSELECT) {
pvmlogprintf("work() SELECT returns %d\n", nrdy);
}
/*
* check network socket and local master socket for action
*/
if (nrdy > 0) {
if (FD_ISSET(netsock, &rfds)) {
nrdy--;
netinput();
}
if (loclsock >= 0 && FD_ISSET(loclsock, &rfds)) {
nrdy--;
loclconn();
}
}
/*
* check tasks for action
*/
#ifdef SHMEM
someclosed = 0;
#endif
if (loclsock >= 0) {
for (tp = locltasks->t_link;
nrdy > 0 && tp != locltasks;
tp = tp->t_link) {
if (tp->t_sock >= 0 && FD_ISSET(tp->t_sock, &rfds)) {
FD_CLR(tp->t_sock, &rfds);
nrdy--;
if (loclinput(tp)) {
#ifdef SHMEM
if (tp->t_tid == 0)
someclosed++;
#endif
if (pvmdebmask & PDMTASK) {
pvmlogprintf(
"work() error reading from t%x, marking dead\n",
tp->t_tid);
}
if (!(tp->t_flag & TF_FORKD)) {
tp = tp->t_rlink;
task_cleanup(tp->t_link);
task_free(tp->t_link);
} else
wrk_fds_delete(tp->t_sock, 3);
continue;
}
}
if (tp->t_sock >= 0 && FD_ISSET(tp->t_sock, &wfds)) {
FD_CLR(tp->t_sock, &wfds);
nrdy--;
if (locloutput(tp)) {
#ifdef SHMEM
if (tp->t_tid == 0)
someclosed++;
#endif
if (!(tp->t_flag & TF_FORKD)) {
tp = tp->t_rlink;
task_cleanup(tp->t_link);
task_free(tp->t_link);
} else
wrk_fds_delete(tp->t_sock, 3);
continue;
}
}
if (tp->t_out >= 0 && FD_ISSET(tp->t_out, &rfds)) {
FD_CLR(tp->t_out, &rfds);
nrdy--;
loclstout(tp);
}
}
}
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
mpp_output((struct task *)0, (struct pkt *)0);
#endif
#ifdef SHMEM
if (someclosed)
mpp_dredge();
#endif
}
}
/* netoutput()
*
* Send packets out the wire to remote pvmds.
*/
netoutput()
{
struct timeval tnow, tx;
struct pkt *pp, *pp2;
struct hostd *hp;
char *cp;
int len;
int cc;
char dummy[DDFRAGHDR];
/*
len = 0;
for (pp = opq->pk_tlink; pp != opq; pp = pp->pk_tlink)
len++;
pvmlogprintf("netoutput() %d in opq\n", len);
*/
if (opq->pk_tlink == opq)
return 0;
/*
* send any pkts whose time has come
*/
pvmgetclock(&tnow);
while ((pp = opq->pk_tlink) != opq && TVXLTY(&pp->pk_rtv, &tnow)) {
/*
* fail if we've tried too hard
*/
hp = pp->pk_hostd;
if (pp->pk_nrt >= DDMINRETRIES
&& pp->pk_rto.tv_sec >= DDMINTIMEOUT) { /* host is toast */
pvmlogprintf(
"netoutput() timed out sending to %s after %d, %d.%06d\n",
hp->hd_name, pp->pk_nrt,
pp->pk_rto.tv_sec, pp->pk_rto.tv_usec);
hd_dump(hp);
hostfailentry(hp);
clear_opq_of((int)(TIDPVMD | hp->hd_hostpart));
ht_delete(hosts, hp);
if (newhosts)
ht_delete(newhosts, hp);
continue;
}
cp = pp->pk_dat;
len = pp->pk_len;
if (pp->pk_flag & FFSOM) {
cp -= MSGHDRLEN;
len += MSGHDRLEN;
if (cp < pp->pk_buf) {
pvmlogerror("netoutput() no headroom for message header\n");
return 0;
}
pvmput32(cp, pp->pk_enc);
pvmput32(cp + 4, pp->pk_tag);
pvmput32(cp + 8, pp->pk_ctx);
pvmput32(cp + 16, pp->pk_wid);
pvmput32(cp + 20, pp->pk_crc);
}
cp -= DDFRAGHDR;
len += DDFRAGHDR;
/*
* save under packet header, because databuf may be shared.
* we don't worry about message header, because it's only at the head.
*/
BCOPY(cp, dummy, sizeof(dummy));
if (cp < pp->pk_buf) {
pvmlogerror("netoutput() no headroom for packet header\n");
return 0;
}
if (pvmdebmask & PDMPACKET) {
pvmlogprintf(
"netoutput() pkt to %s src t%x dst t%x f %s len %d seq %d ack %d retry %d\n",
hp->hd_name, pp->pk_src, pp->pk_dst, pkt_flags(pp->pk_flag),
pp->pk_len, pp->pk_seq, pp->pk_ack, pp->pk_nrt);
}
pvmput32(cp, pp->pk_dst);
pvmput32(cp + 4, pp->pk_src);
pvmput16(cp + 8, pp->pk_seq);
pvmput16(cp + 10, pp->pk_ack);
pvmput32(cp + 12, 0); /* to keep purify happy */
pvmput8(cp + 12, pp->pk_flag);
#if 0
/* drop (don't send) random packets */
if (!(random() & 3)) {
pvmlogerror("netoutput() darn, dropped one\n");
cc = -1;
} else
#endif
if ((cc = sendto(netsock, cp, len, 0,
(struct sockaddr*)&hp->hd_sad, sizeof(hp->hd_sad))) == -1
&& errno != EINTR
#ifndef WIN32
&& errno != ENOBUFS
#endif
#ifdef IMA_LINUX
&& errno != ENOMEM
#endif
) {
pvmlogperror("netoutput() sendto");
#if defined(IMA_SUN4SOL2) || defined(IMA_X86SOL2) || defined(IMA_SUNMP) || defined(IMA_UXPM)
/* life, don't talk to me about life... */
if (errno == ECHILD)
pvmlogerror("this message brought to you by solaris\n");
else
#endif
pvmbailout(0);
}
#ifdef STATISTICS
if (cc == -1)
stats.sdneg++;
else
stats.sdok++;
#endif
BCOPY(dummy, cp, sizeof(dummy)); /* restore under header */
/*
* set timer for next retry
*/
if (cc != -1) {
if ((pp->pk_flag & (FFFIN|FFACK)) == (FFFIN|FFACK)) {
pk_free(pp);
if (hp != hosts->ht_hosts[0]) {
hostfailentry(hp);
clear_opq_of((int)(TIDPVMD | hp->hd_hostpart));
ht_delete(hosts, hp);
if (newhosts)
ht_delete(newhosts, hp);
}
continue;
}
if (!((pp->pk_flag & FFDAT)
|| (pp->pk_flag & (FFFIN|FFACK)) == FFFIN)) {
pk_free(pp);
continue;
}
if (!TVISSET(&pp->pk_at))
pp->pk_at = tnow;
TVXADDY(&pp->pk_rtv, &tnow, &pp->pk_rta);
TVXADDY(&pp->pk_rto, &pp->pk_rto, &pp->pk_rta);
#ifdef STATISTICS
if (pp->pk_nrt)
stats.netret++;
#endif
++pp->pk_nrt;
if (pp->pk_rta.tv_sec < DDMAXRTT) {
TVXADDY(&pp->pk_rta, &pp->pk_rta, &pp->pk_rta);
}
} else {
tx.tv_sec = DDERRRETRY/1000000;
tx.tv_usec = DDERRRETRY%1000000;
TVXADDY(&pp->pk_rtv, &tnow, &tx);
TVXADDY(&pp->pk_rto, &pp->pk_rto, &tx);
}
/* reinsert packet into opq */
LISTDELETE(pp, pk_tlink, pk_trlink);
for (pp2 = opq->pk_trlink; pp2 != opq; pp2 = pp2->pk_trlink)
if (TVXLTY(&pp2->pk_rtv, &pp->pk_rtv))
break;
LISTPUTAFTER(pp2, pp, pk_tlink, pk_trlink);
}
return 0;
}
/* netinput()
*
* Input from a remote pvmd.
* Accept a packet, do protocol stuff then pass pkt to netinpkt().
*/
int
netinput()
{
struct sockaddr_in osad; /* sender's ip addr */
#ifdef SOCKLENISUINT
size_t oslen;
#else
int oslen;
#endif /* sockaddr length */
struct timeval tnow;
struct pkt *pp, *pp2;
struct hostd *hp;
char *cp;
int sqn;
int aqn;
int ff;
int dst;
int src;
int hh;
int already;
struct timeval tdiff; /* packet rtt */
int rttusec;
/*
* alloc new pkt buffer and read packet
*/
pp = pk_new(pvmudpmtu);
if (TDFRAGHDR > DDFRAGHDR)
pp->pk_dat += TDFRAGHDR - DDFRAGHDR;
oslen = sizeof(osad);
if ((pp->pk_len = recvfrom(netsock, pp->pk_dat,
pp->pk_max - (pp->pk_dat - pp->pk_buf),
0, (struct sockaddr*)&osad, &oslen)) == -1) {
if (errno != EINTR)
pvmlogperror("netinput() recvfrom(netsock)");
goto scrap;
}
#if 0
/* drop random packets */
if (!(random() & 3)) {
pvmlogerror("netinput() oops, dropped one\n");
goto scrap;
}
#endif
#ifdef STATISTICS
stats.rfok++;
#endif
cp = pp->pk_dat;
pp->pk_len -= DDFRAGHDR;
pp->pk_dat += DDFRAGHDR;
dst = pp->pk_dst = pvmget32(cp);
src = pp->pk_src = pvmget32(cp + 4);
sqn = pp->pk_seq = pvmget16(cp + 8);
aqn = pvmget16(cp + 10);
ff = pp->pk_flag = pvmget8(cp + 12);
if (ff & FFSOM) {
if (pp->pk_len < MSGHDRLEN) {
pvmlogprintf("netinput() SOM pkt src t%x dst t%x too short\n",
src, dst);
goto scrap;
}
cp += DDFRAGHDR;
pp->pk_enc = pvmget32(cp);
pp->pk_tag = pvmget32(cp + 4);
pp->pk_ctx = pvmget32(cp + 8);
pp->pk_wid = pvmget32(cp + 16);
pp->pk_crc = pvmget32(cp + 20);
pp->pk_len -= MSGHDRLEN;
pp->pk_dat += MSGHDRLEN;
}
/*
* make sure it's from where it claims
*/
hh = (src & tidhmask) >> (ffs(tidhmask) - 1);
if (hh < 0 || hh > hosts->ht_last || !(hp = hosts->ht_hosts[hh])
#ifndef IMA_LINUX
/*
* XXX removing these lines is a hack and reduces security between
* XXX pvmds somewhat, but it's the easiest fix for Linux right now.
*/
|| (osad.sin_addr.s_addr != hp->hd_sad.sin_addr.s_addr)
|| (osad.sin_port != hp->hd_sad.sin_port)
#endif
) {
pvmlogprintf("netinput() bogus pkt from %s\n",
inadport_decimal(&osad));
goto scrap;
}
if (pvmdebmask & PDMPACKET) {
pvmlogprintf(
"netinput() pkt from %s src t%x dst t%x f %s len %d seq %d ack %d\n",
hp->hd_name, src, dst, pkt_flags(ff), pp->pk_len, sqn, aqn);
}
if ((ff & (FFFIN|FFACK)) == (FFFIN|FFACK)) {
if (hh == hosts->ht_master) {
/*
* FIN|ACK from master means we should bailout
*/
if (runstate == PVMDPRIME) {
if (pvmdebmask & PDMSTARTUP)
pvmlogerror("work() PVMDPRIME halting\n");
exit(0);
}
pvmlogprintf("netinput() FIN|ACK from master (%s)\n",
hp->hd_name);
runstate = PVMDHALTING;
} else {
/*
* FIN|ACK from slave means it croaked
*/
pvmlogprintf("netinput() FIN|ACK from %s\n",
hp->hd_name);
hd_dump(hp);
hostfailentry(hp);
clear_opq_of((int)(TIDPVMD | hp->hd_hostpart));
if (hp->hd_hostpart) {
ht_delete(hosts, hp);
if (newhosts)
ht_delete(newhosts, hp);
}
}
goto scrap;
}
/*
* done with outstanding packet covered by this ack
*/
if (ff & FFACK) {
for (pp2 = hp->hd_opq->pk_link; pp2 != hp->hd_opq; pp2 = pp2->pk_link)
if (pp2->pk_seq == aqn) {
if (pp2->pk_flag & FFDAT) {
if (pp2->pk_nrt == 1) {
pvmgetclock(&tnow);
TVXSUBY(&tdiff, &tnow, &pp2->pk_at);
rttusec = tdiff.tv_sec * 1000000 + tdiff.tv_usec;
if (rttusec < 1)
rttusec = 1000; /* XXX const */
else
if (rttusec > DDMAXRTT*1000000)
rttusec = DDMAXRTT*1000000;
rttusec += 3 * (hp->hd_rtt.tv_sec * 1000000 + hp->hd_rtt.tv_usec);
rttusec /= 4;
hp->hd_rtt.tv_sec = rttusec / 1000000;
hp->hd_rtt.tv_usec = rttusec % 1000000;
}
}
if (pp2->pk_flag & FFFIN) {
finack_to_host(hp);
}
hp->hd_nop--;
LISTDELETE(pp2, pk_link, pk_rlink);
pk_free(pp2);
break;
}
}
/*
* move another pkt to output q
*/
/*
if ((hp->hd_opq->pk_link == hp->hd_opq)
*/
if (hp->hd_nop < nopax
&& (hp->hd_txq->pk_link != hp->hd_txq)) {
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("netinput() pkt to opq\n");
}
pp2 = hp->hd_txq->pk_link;
LISTDELETE(pp2, pk_link, pk_rlink);
TVCLEAR(&pp2->pk_rtv);
TVXADDY(&pp2->pk_rta, &hp->hd_rtt, &hp->hd_rtt);
TVCLEAR(&pp2->pk_rto);
TVCLEAR(&pp2->pk_at);
pp2->pk_nrt = 0;
pp2->pk_hostd = hp;
pp2->pk_seq = hp->hd_txseq;
hp->hd_txseq = NEXTSEQNUM(hp->hd_txseq);
LISTPUTBEFORE(hp->hd_opq, pp2, pk_link, pk_rlink);
hp->hd_nop++;
LISTPUTBEFORE(opq, pp2, pk_tlink, pk_trlink);
}
if (!(ff & (FFDAT|FFFIN)))
goto scrap;
/*
* send an ack for the pkt
*/
pp2 = pk_new(DDFRAGHDR); /* XXX could reref a dummy databuf here */
pp2->pk_dat += DDFRAGHDR;
pp2->pk_dst = hp->hd_hostpart | TIDPVMD;
pp2->pk_src = pvmmytid;
pp2->pk_flag = FFACK;
TVCLEAR(&pp2->pk_rtv);
TVCLEAR(&pp2->pk_rta);
TVCLEAR(&pp2->pk_rto);
TVCLEAR(&pp2->pk_at);
pp2->pk_nrt = 0;
pp2->pk_hostd = hp;
pp2->pk_seq = 0;
pp2->pk_ack = sqn;
LISTPUTAFTER(opq, pp2, pk_tlink, pk_trlink);
if (!(ff & FFDAT))
goto scrap;
/*
* if we don't have it already, put it in reordering q
*/
pp2 = 0;
if (SEQLESSTHAN(sqn, hp->hd_rxseq))
already = 1;
else {
already = 0;
for (pp2 = hp->hd_rxq->pk_link; pp2 != hp->hd_rxq; pp2 = pp2->pk_link)
if (!SEQLESSTHAN(pp2->pk_seq,sqn)) {
if (pp2->pk_seq == sqn)
already = 1;
break;
}
}
if (already) {
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("netinput() pkt resent from %s seq %d\n",
hp->hd_name, sqn);
}
goto scrap;
}
LISTPUTBEFORE(pp2, pp, pk_link, pk_rlink);
/*
* accept pkts from reordering q
*/
while (pp = hp->hd_rxq->pk_link,
pp != hp->hd_rxq && pp->pk_seq == hp->hd_rxseq) {
hp->hd_rxseq = NEXTSEQNUM(hp->hd_rxseq);
LISTDELETE(pp, pk_link, pk_rlink);
netinpkt(hp, pp);
}
return 0;
scrap:
if (pp)
pk_free(pp);
return 0;
}
/* netinpkt()
*
* Consume pkt from network. It's either for the pvmd and needs to
* be reassembled into a message or it's for a local task and needs
* to be put on the queue to be sent.
*/
netinpkt(hp, pp)
struct hostd *hp;
struct pkt *pp;
{
struct mca *mcap = 0;
struct task *tp;
struct pmsg *mp;
struct frag *fp;
struct pkt *pp2;
int src = pp->pk_src;
int dst = pp->pk_dst;
int ff = pp->pk_flag;
char *cp;
int i;
int firstmca = 1;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf(
"netinpkt() pkt from %s src t%x dst t%x f %s len %d\n",
hp->hd_name, src, dst, pkt_flags(ff), pp->pk_len);
}
/* throw out packet if it's not for us */
if (TIDISMCA(dst)) {
for (mcap = hp->hd_mcas->mc_link; mcap != hp->hd_mcas;
mcap = mcap->mc_link)
if (mcap->mc_tid == dst)
break;
if (mcap == hp->hd_mcas)
mcap = 0;
}
if ((dst & tidhmask) != myhostpart && !mcap) {
if (pvmdebmask & (PDMPACKET|PDMAPPL)) {
pvmlogprintf(
"netinpkt() pkt from t%x for t%x scrapped (not us)\n",
src, dst);
}
goto done;
}
if (mcap) {
for (i = mcap->mc_ndst; i-- > 0; ) {
dst = mcap->mc_dsts[i];
if (tp = task_find(dst)) { /* to local task */
pp2 = pk_new(0);
pp2->pk_src = src;
pp2->pk_dst = dst;
pp2->pk_flag = ff;
#if defined(IMA_MPP)
if (firstmca)
{
pp2->pk_flag |= (FFMCA | FFMCAWH);
firstmca = 0;
}
else
pp2->pk_flag |= FFMCA;
#endif
pp2->pk_enc = pp->pk_enc;
pp2->pk_tag = pp->pk_tag;
pp2->pk_ctx = pp->pk_ctx;
pp2->pk_wid = pp->pk_wid;
pp2->pk_crc = pp->pk_crc;
pp2->pk_buf = pp->pk_buf;
pp2->pk_max = pp->pk_max;
pp2->pk_dat = pp->pk_dat;
pp2->pk_len = pp->pk_len;
da_ref(pp->pk_buf);
pkt_to_task(tp, pp2);
} else
if (pvmdebmask & (PDMPACKET|PDMAPPL)) {
pvmlogprintf(
"netinpkt() mc pkt from t%x for t%x scrapped (no dst)\n",
src, dst);
}
}
if (ff & FFEOM) {
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("netinpkt() freed mca %x from t%x\n",
mcap->mc_tid, hp->hd_name);
}
mca_free(mcap);
}
goto done;
}
if ((dst & ~tidhmask) == TIDPVMD) { /* for pvmd */
if (ff & FFSOM) { /* start of message */
if (hp->hd_rxm) {
pvmlogprintf("netinpkt() repeated start pkt from %s\n",
hp->hd_name);
goto done;
}
hp->hd_rxm = mesg_new(0);
hp->hd_rxm->m_enc = pp->pk_enc;
hp->hd_rxm->m_tag = pp->pk_tag;
hp->hd_rxm->m_ctx = pp->pk_ctx;
hp->hd_rxm->m_wid = pp->pk_wid;
hp->hd_rxm->m_crc = pp->pk_crc;
hp->hd_rxm->m_dst = dst;
hp->hd_rxm->m_src = src;
} else { /* middle or end of message */
if (!hp->hd_rxm) {
pvmlogprintf(
"netinpkt() spurious pkt (no message) from %s\n",
hp->hd_name);
goto done;
}
}
fp = fr_new(0);
fp->fr_buf = pp->pk_buf;
fp->fr_dat = pp->pk_dat;
fp->fr_max = pp->pk_max;
fp->fr_len = pp->pk_len;
da_ref(pp->pk_buf);
LISTPUTBEFORE(hp->hd_rxm->m_frag, fp, fr_link, fr_rlink);
hp->hd_rxm->m_len += fp->fr_len;
if (ff & FFEOM) { /* end of message */
mp = hp->hd_rxm;
hp->hd_rxm = 0;
#ifdef MCHECKSUM
if (mp->m_crc != mesg_crc(mp)) {
pvmlogprintf(
"netinpkt() message from t%x to t%x bad checksum\n",
src, dst);
/* XXX must free message? */
goto done;
}
#endif
mesg_rewind(mp);
if (TIDISTASK(src)) {
if (src == pvmschedtid) {
schentry(mp);
} else
if (pvmdebmask & (PDMMESSAGE|PDMAPPL)) {
pvmlogprintf(
"netinpkt() mesg from t%x to t%x tag %d scrapped\n",
src, dst, mp->m_tag);
/* XXX must free message? */
}
} else {
netentry(hp, mp);
}
}
} else { /* for a task */
if (tp = task_find(dst)) {
#if defined(IMA_PGON) || defined(IMA_I860)
if (TIDISNODE(dst))
mpp_output(tp, pp);
else
#endif
pkt_to_task(tp, pp);
pp = 0;
} else {
if (pvmdebmask & (PDMPACKET|PDMAPPL)) {
pvmlogprintf(
"netinpkt() pkt from t%x for t%x scrapped (no dst)\n",
src, dst);
/* XXX must free message? */
}
goto done;
}
}
done:
if (pp)
pk_free(pp);
return 0;
}
/* loclconn()
*
* Task has attempted to connect. Accept the new connection and make
* a blank context for it.
*/
loclconn()
{
struct task *tp; /* new task context */
#ifdef SOCKLENISUINT
size_t oslen;
#else
int oslen;
#endif
int i;
#ifndef NOUNIXDOM
struct sockaddr_un uns;
#endif
/*
#ifdef SHMEM
return 0;
just getma outa here boy!
#endif
*/
tp = task_new(0);
#ifdef NOUNIXDOM
tp->t_salen = sizeof(tp->t_sad);
if ((tp->t_sock = accept(loclsock, (struct sockaddr*)&tp->t_sad,
&tp->t_salen)) == -1) {
pvmlogperror("loclconn() accept");
task_free(tp);
tp = 0;
} else {
if (pvmdebmask & (PDMPACKET|PDMTASK)) {
pvmlogprintf("loclconn() accept from %s sock %d\n",
inadport_decimal(&tp->t_sad), tp->t_sock);
}
#ifndef NOSOCKOPT
i = 1;
if (setsockopt(tp->t_sock, IPPROTO_TCP, TCP_NODELAY,
(char*)&i, sizeof(int)) == -1) {
pvmlogperror("loclconn() setsockopt");
}
#endif
}
#else /*NOUNIXDOM*/
oslen = sizeof(uns);
if ((tp->t_sock = accept(loclsock, (struct sockaddr*)&uns, &oslen)) == -1) {
pvmlogperror("loclconn() accept");
task_free(tp);
tp = 0;
} else {
if (pvmdebmask & (PDMPACKET|PDMTASK))
pvmlogerror("loclconn() accept\n");
}
#endif /*NOUNIXDOM*/
if (tp) {
#ifndef WIN32
if ((i = fcntl(tp->t_sock, F_GETFL, 0)) == -1)
pvmlogperror("loclconn: fcntl");
else {
#ifdef IMA_RS6K
/* did you ever feel as though your mind had started to erode? */
i |= O_NONBLOCK;
#else /*IMA_RS6K*/
#ifdef O_NDELAY
i |= O_NDELAY;
#else
i |= FNDELAY;
#endif
#endif /*IMA_RS6K*/
(void)fcntl(tp->t_sock, F_SETFL, i);
}
#endif
wrk_fds_add(tp->t_sock, 1);
}
return 0;
}
/* locloutput()
*
* Output to local task. Sends packets until write() blocks.
* Deletes task's bit from wrk_wfds if no more data to send.
*
* Returns 0 if okay, else -1 if unrecoverable error.
*/
locloutput(tp)
struct task *tp;
{
struct pkt *pp;
char *cp;
int len;
int n;
while ((pp = tp->t_txq->pk_link)->pk_buf) {
if (!pp->pk_cpos || pp->pk_cpos < pp->pk_dat) {
/*
* prepend frag [message] headers if we'll be writing them.
*/
cp = pp->pk_dat;
len = pp->pk_len;
if (pp->pk_flag & FFSOM) {
cp -= MSGHDRLEN;
len += MSGHDRLEN;
if (cp < pp->pk_buf) {
pvmlogerror("locloutput() no headroom for message header\n");
return 0;
}
pvmput32(cp, pp->pk_enc);
pvmput32(cp + 4, pp->pk_tag);
pvmput32(cp + 8, pp->pk_ctx);
pvmput32(cp + 16, pp->pk_wid);
pvmput32(cp + 20, pp->pk_crc);
}
cp -= TDFRAGHDR;
if (cp < pp->pk_buf) {
pvmlogerror("locloutput() no headroom for packet header\n");
return 0;
}
pvmput32(cp, pp->pk_dst);
pvmput32(cp + 4, pp->pk_src);
pvmput32(cp + 8, len);
pvmput32(cp + 12, 0); /* to keep purify happy */
pvmput8(cp + 12, pp->pk_flag & (FFSOM|FFEOM));
len += TDFRAGHDR;
}
if (pp->pk_cpos) {
cp = pp->pk_cpos;
len = pp->pk_len + (pp->pk_dat - cp);
} else {
pp->pk_cpos = cp;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf(
"locloutput() src t%x dst t%x f %s len %d\n",
pp->pk_src, pp->pk_dst, pkt_flags(pp->pk_flag), len);
}
}
/*
* send as much as possible; skip to next packet when all sent
*/
#if defined(IMA_RS6K) || defined(IMA_SP2MPI)
n = write(tp->t_sock, cp, min(len, 4096));
#else
#ifndef WIN32
n = write(tp->t_sock, cp, len);
#else
n = win32_write_socket(tp->t_sock,cp,len);
#endif
#endif
#ifdef STATISTICS
if (n == -1)
stats.wrneg++;
else
if (!n)
stats.wrzer++;
else
if (n == len)
stats.wrok++;
else
stats.wrshr++;
#endif
if (n == -1) {
if (errno != EINTR
#ifndef WIN32
&& errno != EWOULDBLOCK
&& errno != ENOBUFS
#endif
&& errno != EAGAIN)
{
#ifdef WIN32
if (GetLastError() != WSAECONNRESET) {
#endif
pvmlogperror("locloutput() write");
pvmlogprintf("locloutput() marking t%x dead\n",
tp->t_tid);
#ifdef WIN32
}
#endif
return -1;
}
break;
}
if (n > 0) {
if (pvmdebmask & PDMPACKET) {
pvmlogprintf(
"locloutput() src t%x dst t%x wrote %d\n",
pp->pk_src, pp->pk_dst, n);
}
if ((len - n) > 0) {
pp->pk_cpos += n;
} else {
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
int dst = pp->pk_dst;
#endif
LISTDELETE(pp, pk_link, pk_rlink);
pk_free(pp);
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
if (TIDISNODE(dst)) {
struct task *tp2;
/* Expensive! But what else can we do? */
if ((tp2 = task_find(dst)) && (tp2->t_flag & TF_CLOSE)) {
mpp_free(tp2);
/* XXX task_cleanup(tp2); */
task_free(tp2);
}
}
#endif /*defined(IMA_CM5) || defined(IMA_SP2MPI)*/
}
} else
break;
}
if (tp->t_txq->pk_link == tp->t_txq) {
wrk_fds_delete(tp->t_sock, 2);
/* flush context if TF_CLOSE set */
if (tp->t_flag & TF_CLOSE)
return -1;
}
return 0;
}
/* loclinput()
*
* Input from a task.
* Accept a packet and pass pkt to loclinpkt().
* Returns 0 else -1 if error (work() should clean up the task context).
*/
loclinput(tp)
struct task *tp;
{
struct pkt *pp = 0;
struct pkt *pp2;
int n, m;
again:
/*
* if no current packet, start a new one
*/
if (!tp->t_rxp) {
tp->t_rxp = pk_new(pvmudpmtu);
/*
tp->t_rxp = pk_new(TDFRAGHDR + 2);
*/
if (DDFRAGHDR > TDFRAGHDR)
tp->t_rxp->pk_dat += DDFRAGHDR - TDFRAGHDR;
}
pp = tp->t_rxp;
/*
* read the fragment header and body separately so we can
* make a bigger buffer if needed
*/
n = (pp->pk_len < TDFRAGHDR) ? 0 : pvmget32(pp->pk_dat + 8);
n += TDFRAGHDR - pp->pk_len;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("loclinput() t%x fr_len=%d fr_dat=+%d n=%d\n",
tp->t_tid, pp->pk_len, pp->pk_dat - pp->pk_buf, n);
}
#ifndef WIN32
n = read(tp->t_sock, pp->pk_dat + pp->pk_len, n);
#else
n = win32_read_socket(tp->t_sock,pp->pk_dat + pp->pk_len,n);
#endif
if (pvmdebmask & PDMPACKET) {
if (n >= 0) {
pvmlogprintf("loclinput() read=%d\n", n);
} else
pvmlogperror("loclinput() read");
}
#ifdef STATISTICS
switch (n) {
case -1:
stats.rdneg++;
break;
case 0:
stats.rdzer++;
break;
default:
stats.rdok++;
break;
}
#endif
if (n == -1) {
if (errno != EINTR
#ifndef WIN32
&& errno != EWOULDBLOCK
#endif
) {
pvmlogperror("loclinput() read");
pvmlogprintf("loclinput() marking t%x dead\n",
tp->t_tid);
return -1;
}
return 0;
}
if (!n) {
if (pvmdebmask & (PDMPACKET|PDMMESSAGE|PDMTASK)) {
pvmlogprintf("loclinput() read EOF from t%x sock %d\n",
tp->t_tid, tp->t_sock);
}
return -1;
}
if ((pp->pk_len += n) < TDFRAGHDR)
return 0;
/*
* if we have a complete frag, accept it
*/
m = TDFRAGHDR + pvmget32(pp->pk_dat + 8);
if (pp->pk_len == m) {
tp->t_rxp = 0;
pp->pk_dst = pvmget32(pp->pk_dat);
#if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_CM5) || defined(IMA_SP2MPI)
pp->pk_src = pvmget32(pp->pk_dat + 4);
#else
pp->pk_src = tp->t_tid;
#endif
pp->pk_flag = pvmget8(pp->pk_dat + 12);
pp->pk_len -= TDFRAGHDR;
pp->pk_dat += TDFRAGHDR;
if (pp->pk_flag & FFSOM) {
if (pp->pk_len < MSGHDRLEN) {
pvmlogprintf(
"loclinput() SOM pkt src t%x dst t%x too short\n",
pp->pk_src, pp->pk_dst);
pk_free(pp);
return 0;
}
pp->pk_enc = pvmget32(pp->pk_dat);
pp->pk_tag = pvmget32(pp->pk_dat + 4);
pp->pk_ctx = pvmget32(pp->pk_dat + 8);
pp->pk_wid = pvmget32(pp->pk_dat + 16);
pp->pk_crc = pvmget32(pp->pk_dat + 20);
pp->pk_len -= MSGHDRLEN;
pp->pk_dat += MSGHDRLEN;
}
if (loclinpkt(tp, pp))
return -1;
return 0;
}
/* realloc buffer if frag won't fit */
if (pp->pk_len == TDFRAGHDR) {
if (m > pp->pk_max - (pp->pk_dat - pp->pk_buf)) {
if (!(tp->t_flag & TF_CONN)) {
pvmlogprintf(
"loclinput() unconnected task sends frag length %d (ha)\n",
m);
return -1;
}
if (DDFRAGHDR > TDFRAGHDR) {
pp2 = pk_new(m + DDFRAGHDR - TDFRAGHDR);
pp2->pk_dat += DDFRAGHDR - TDFRAGHDR;
} else
pp2 = pk_new(m);
BCOPY(pp->pk_dat, pp2->pk_dat, TDFRAGHDR);
pp2->pk_len = pp->pk_len;
pk_free(pp);
pp = tp->t_rxp = pp2;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("loclinput() realloc frag max=%d\n", m);
}
}
goto again;
}
return 0;
}
/* loclinpkt()
*
* Consume pkt from task.
* If it's for the pvmd it needs to be reassembled into a message.
* If for a local or foreign task it needs to be put on a queue to be sent.
* If for a remote pvmd, reassemble as for local then fwd whole message.
* Returns 0 else -1 if error (work() should cleanup the
* task context).
*/
loclinpkt(tp, pp)
struct task *tp;
struct pkt *pp;
{
int dst; /* pkt dst */
int ff; /* pkt flags */
struct pkt *pp2;
struct frag *fp;
struct pmsg *mp;
struct hostd *hp;
struct task *tp2;
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
struct task *socktp = tp; /* owner of the socket */
#endif
dst = pp->pk_dst;
ff = pp->pk_flag;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf(
"loclinpkt() src t%x dst t%x f %s len %d\n",
pp->pk_src, dst, pkt_flags(ff), pp->pk_len);
}
#ifdef IMA_SP2MPI
if (pp->pk_src > 0 && !tp->t_tid && (tp2 = task_findpid(pp->pk_src))) {
/* connect request from pvmhost */
mpp_conn(tp, tp2);
pk_free(pp);
return -1;
}
#endif
#if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_CM5) || defined(IMA_SP2MPI)
if (TIDISNODE(pp->pk_src)) /* from a node */
if (!(tp = task_find(pp->pk_src))) {
pvmlogprintf("loclinpkt() from unknown task t%x\n", pp->pk_src);
goto done;
}
#endif /*defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_CM5) || defined(IMA_SP2MPI)*/
/*
* if to multicast addr, replicate pkt in each q
*/
if (TIDISMCA(dst) && tp->t_mca && tp->t_mca->mc_tid == dst) {
struct mca *mcap = tp->t_mca;
int i;
for (i = mcap->mc_ndst; i-- > 0; ) {
dst = mcap->mc_dsts[i];
if (hp = tidtohost(hosts, dst)) {
pp2 = pk_new(0);
pp2->pk_src = pp->pk_src;
pp2->pk_dst = mcap->mc_tid;
pp2->pk_flag = ff;
pp2->pk_enc = pp->pk_enc;
pp2->pk_tag = pp->pk_tag;
pp2->pk_ctx = pp->pk_ctx;
pp2->pk_wid = pp->pk_wid;
pp2->pk_crc = pp->pk_crc;
pp2->pk_buf = pp->pk_buf;
pp2->pk_max = pp->pk_max;
pp2->pk_dat = pp->pk_dat;
pp2->pk_len = pp->pk_len;
da_ref(pp->pk_buf);
if (hp->hd_hostpart == myhostpart) {
netinpkt(hp, pp2);
} else {
pkt_to_host(hp, pp2);
}
} else
if (pvmdebmask & (PDMPACKET|PDMAPPL)) {
pvmlogprintf(
"loclinpkt() pkt src t%x dst t%x scrapped (no such host)\n",
pp->pk_src, dst);
}
}
/* free mca on last pkt */
if (ff & FFEOM) {
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("loclinpkt() freed mca %x for t%x\n",
mcap->mc_tid, tp->t_tid);
}
mca_free(mcap);
tp->t_mca = 0;
}
goto done;
}
/*
* if to a pvmd, always reassemble (forward if not for us)
*/
if ((dst & ~tidhmask) == TIDPVMD) {
if (ff & FFSOM) { /* start of message */
if (tp->t_rxm) {
pvmlogprintf("loclinpkt() repeated start pkt t%x\n",
tp->t_tid);
goto done;
}
tp->t_rxm = mesg_new(0);
tp->t_rxm->m_ctx = pp->pk_ctx;
tp->t_rxm->m_tag = pp->pk_tag;
tp->t_rxm->m_enc = pp->pk_enc;
tp->t_rxm->m_wid = pp->pk_wid;
tp->t_rxm->m_crc = pp->pk_crc;
tp->t_rxm->m_dst = dst;
tp->t_rxm->m_src = tp->t_tid;
} else { /* middle or end of message */
if (!tp->t_rxm) {
pvmlogprintf(
"loclinpkt() pkt with no message src t%x\n",
tp->t_tid);
goto done;
}
}
fp = fr_new(0);
fp->fr_buf = pp->pk_buf;
fp->fr_dat = pp->pk_dat;
fp->fr_max = pp->pk_max;
fp->fr_len = pp->pk_len;
da_ref(pp->pk_buf);
LISTPUTBEFORE(tp->t_rxm->m_frag, fp, fr_link, fr_rlink);
tp->t_rxm->m_len += fp->fr_len;
if (ff & FFEOM) { /* end of message */
mp = tp->t_rxm;
tp->t_rxm = 0;
#ifdef MCHECKSUM
if (mp->m_crc != mesg_crc(mp)) {
pvmlogprintf(
"loclinpkt() message src t%x dst t%x bad checksum\n",
mp->m_src, dst);
goto done;
}
#endif
if (!(dst & tidhmask) || (dst & tidhmask) == myhostpart) { /* local */
mesg_rewind(mp);
if (mp->m_tag >= (int)SM_FIRST && mp->m_tag <= (int)SM_LAST
&& (mp->m_src == pvmschedtid || mp->m_src == hostertid || mp->m_src == taskertid))
{
schentry(mp);
} else {
loclentry(tp, mp);
}
} else { /* remote */
if (!tp->t_tid) {
pvmlogprintf("loclinpkt() pkt src null dst t%x\n", dst);
goto done;
}
sendmessage(mp);
}
/*
* if sock is -1, tm_conn2() wants us to throw out this context
* because it's been merged into another.
*/
#if defined(IMA_CM5) || defined(IMA_SP2MPI)
/* node procs have no socket; they use pvmhost's */
if (socktp->t_sock == -1)
#else
if (tp->t_sock == -1)
#endif
{
pk_free(pp);
return -1;
}
}
goto done;
}
/*
* if to a task, put in local or remote send queue
*/
if (TIDISTASK(dst)) {
if (!tp->t_tid) {
pvmlogprintf("loclinpkt() pkt src null dst t%x\n", dst);
goto done;
}
if ((dst & tidhmask) == myhostpart) { /* local host */
if (tp2 = task_find(dst)) {
#if defined(IMA_PGON) || defined(IMA_I860)
if (TIDISNODE(dst))
mpp_output(tp2, pp);
else
#endif
pkt_to_task(tp2, pp);
/*
LISTPUTBEFORE(tp2->t_txq, pp, pk_link, pk_rlink);
*/
pp = 0;
} else
if (pvmdebmask & (PDMPACKET|PDMAPPL)) {
pvmlogprintf(
"loclinpkt() pkt src t%x dst t%x scrapped (no such task)\n",
pp->pk_src, dst);
}
} else { /* remote host */
if (hp = tidtohost(hosts, dst)) {
pkt_to_host(hp, pp);
pp = 0;
} else {
if (pvmdebmask & (PDMPACKET|PDMAPPL)) {
pvmlogprintf(
"loclinpkt() pkt src t%x dst t%x scrapped (no such host)\n",
pp->pk_src, dst);
}
goto done;
}
}
}
done:
if (pp)
pk_free(pp);
return 0;
}
/* loclstout()
*
* Read stdout/err pipe from a task.
* Ship it to the output log tid if set, else send it to the master
* pvmd to scribble in its log file.
*/
loclstout(tp)
struct task *tp;
{
int n;
struct pmsg *mp;
static char buf[4000];
#ifndef WIN32
n = read(tp->t_out, buf, sizeof(buf) - 1);
#else
n = win32_read_socket(tp->t_out,buf, sizeof(buf) -1);
#endif
if (n < 1) {
if (n == 0 || (errno != EINTR
#ifndef WIN32
&& errno != EWOULDBLOCK
#endif
)) {
wrk_fds_delete(tp->t_out, 1);
#ifndef WIN32
(void)close(tp->t_out);
#else
(void)_close(tp->t_out);
#endif
tp->t_out = -1;
if (tp->t_outtid > 0) {
mp = mesg_new(0);
mp->m_dst = tp->t_outtid;
mp->m_ctx = tp->t_outctx;
mp->m_tag = tp->t_outtag;
pkint(mp, tp->t_tid);
pkint(mp, 0);
sendmessage(mp);
tp->t_outtid = 0;
}
}
} else {
mp = mesg_new(0);
pkint(mp, tp->t_tid);
pkint(mp, n);
pkbyte(mp, buf, n);
if (tp->t_outtid > 0) {
mp->m_dst = tp->t_outtid;
mp->m_ctx = tp->t_outctx;
mp->m_tag = tp->t_outtag;
} else {
mp->m_tag = DM_TASKOUT;
mp->m_dst = hosts->ht_hosts[hosts->ht_cons]->hd_hostpart | TIDPVMD;
}
sendmessage(mp);
}
return 0;
}
/* mesg_to_task()
*
* Append a message to the send queue for a task.
*
* N.B. Message must contain at least one frag or this will honk.
*/
int
mesg_to_task(tp, mp)
struct task *tp;
struct pmsg *mp;
{
struct frag *fp = mp->m_frag->fr_link;
struct pkt *pp;
int ff = FFSOM; /* frag flags */
int dst = mp->m_dst;
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("mesg_to_task() dst t%x tag %s len %d\n",
dst, pvmnametag(mp->m_tag, (int *)0), mp->m_len);
}
/* if nothing yet in q, add task's sock to wrk_wfds */
if (tp->t_sock >= 0)
wrk_fds_add(tp->t_sock, 2);
do {
pp = pk_new(0);
if (ff & FFSOM) {
pp->pk_enc = mp->m_enc;
pp->pk_tag = mp->m_tag;
pp->pk_ctx = mp->m_ctx;
pp->pk_wid = mp->m_wid;
#ifdef MCHECKSUM
pp->pk_crc = mesg_crc(mp);
#else
pp->pk_crc = 0;
#endif
}
pp->pk_buf = fp->fr_buf;
pp->pk_dat = fp->fr_dat;
pp->pk_max = fp->fr_max;
pp->pk_len = fp->fr_len;
da_ref(pp->pk_buf);
if (fp->fr_link == mp->m_frag)
ff |= FFEOM;
pp->pk_src = TIDPVMD;
pp->pk_dst = dst;
pp->pk_flag = ff;
ff = 0;
#if defined(IMA_PGON) || defined(IMA_I860)
if (TIDISNODE(dst)) {
mpp_output(tp, pp);
continue;
}
#endif
#ifdef SHMEM
if ((tp->t_sock < 0) && ( (tp->t_flag & TF_SHMCONN)
|| ((tp->t_flag & TF_PRESHMCONN) && (mp->m_flag & MM_PRIO)) ) )
{
/* If shmem and no d-t socket or fully connected shmem */
/* use mpp routine instead of socket one */
mpp_output(tp, pp);
continue;
}
#endif
if (mp->m_flag & MM_PRIO) {
LISTPUTAFTER(tp->t_txq, pp, pk_link, pk_rlink);
} else {
pkt_to_task(tp, pp);
/*
LISTPUTBEFORE(tp->t_txq, pp, pk_link, pk_rlink);
*/
}
} while ((fp = fp->fr_link) != mp->m_frag);
return 0;
}
/* sendmessage()
*
* Send a message. If it's for a local task or remote host, cut
* apart the fragments and queue to be sent. If it's for the local
* pvmd, just call netentry() with the whole message.
*
* N.B. MM_PRIO only works for single-frag messages.
*/
int
sendmessage(mp)
struct pmsg *mp;
{
struct hostd *hp = 0;
struct task *tp;
struct frag *fp;
struct pkt *pp;
int ff = FFSOM;
int dst = mp->m_dst;
if (!dst) {
pvmlogerror("sendmessage() what? to t0\n");
}
if (pvmdebmask & PDMMESSAGE) {
pvmlogprintf("sendmessage() dst t%x ctx %d tag %s len %d\n",
dst, mp->m_ctx, pvmnametag(mp->m_tag, (int *)0), mp->m_len);
}
/*
* add a frag to empty message to simplify handling
*/
if ((fp = mp->m_frag->fr_link) == mp->m_frag) {
fp = fr_new(MAXHDR);
fp->fr_dat += MAXHDR;
LISTPUTBEFORE(mp->m_frag, fp, fr_link, fr_rlink);
}
/*
* route message
*/
if (!(dst & tidhmask) || (dst & tidhmask) == myhostpart) { /* to local */
if (TIDISTASK(dst)) { /* to local task */
if (tp = task_find(dst)) {
mesg_to_task(tp, mp);
} else
if (pvmdebmask & (PDMMESSAGE|PDMAPPL)) {
pvmlogprintf(
"sendmessage() scrapped, no such task t%x\n",
dst);
}
} else { /* to myself */
mp->m_ref++;
mesg_rewind(mp);
netentry(hosts->ht_hosts[hosts->ht_local], mp);
}
} else { /* to remote */
/* lookup host */
if (runstate == PVMDHTUPD)
hp = tidtohost(newhosts, dst);
if (!hp && !(hp = tidtohost(hosts, dst))) {
if (pvmdebmask & (PDMMESSAGE|PDMAPPL)) {
pvmlogprintf("sendmessage() scrapped, no such host t%x\n",
dst);
}
goto bail;
}
/* packetize frags */
do {
pp = pk_new(0);
if (ff & FFSOM) {
pp->pk_enc = mp->m_enc;
pp->pk_tag = mp->m_tag;
pp->pk_ctx = mp->m_ctx;
pp->pk_wid = mp->m_wid;
#ifdef MCHECKSUM
pp->pk_crc = mesg_crc(mp);
#else
pp->pk_crc = 0;
#endif
}
pp->pk_buf = fp->fr_buf;
pp->pk_dat = fp->fr_dat;
pp->pk_max = fp->fr_max;
pp->pk_len = fp->fr_len;
da_ref(pp->pk_buf);
if (fp->fr_link == mp->m_frag)
ff |= FFEOM;
pp->pk_src = mp->m_src;
pp->pk_dst = dst;
pp->pk_flag = ff;
ff = 0;
if (mp->m_flag & MM_PRIO) {
if (pvmdebmask & (PDMMESSAGE|PDMAPPL))
pvmlogerror("sendmessage() PRIO message to host? (scrapped)\n");
} else {
pkt_to_host(hp, pp);
}
} while ((fp = fp->fr_link) != mp->m_frag);
}
bail:
pmsg_unref(mp);
return 0;
}
/* forkexec()
*
* Search directories in epaths for given file.
* Clean up any files we opened, fork and exec the named process.
* Leave std{out,err} open so the process can whine if it needs to.
*
* Returns 0 if ok (and fills in tpp), else returns PvmNoFile or
* PvmOutOfRes
*
* N.B. must be able to use argv[-1].
*/
#ifndef WIN32
/* too many ifdefs, WIN 32 gets its own */
#ifdef IMA_OS2
#include<process.h>
static int nextfakepid = 10000000; /* XXX fix this */
int *ptr_nfp=&nextfakepid;
#endif
int
forkexec(flags, name, argv, nenv, env, inst, hosttotal, outof, tpp)
int flags; /* exec options */
char *name; /* filename */
char **argv; /* arg list (argv[-1] must be there) */
int nenv; /* num of envars */
char **env; /* envars */
int inst; /* this processes instance */
int hosttotal; /* how many on this host */
int outof; /* how many are being spawned across machine */
struct task **tpp; /* return task context */
{
int tid; /* task tid */
int pid; /* task pid */
int pfd[2]; /* pipe back from task */
struct task *tp; /* new task context */
char path[MAXPATHLEN];
struct stat sb;
char **ep, **eplist;
int i;
struct pmsg *mp; /* message to tasker */
struct waitc *wp;
int ac;
int realrunstate;
char buf[32];
static char *nullep[] = { "", 0 };
#ifndef IMA_OS2
static int nextfakepid = 10000000; /* XXX fix this */
#endif
if ((tid = tid_new()) < 0) {
pvmlogerror("forkexec() out of tids?\n");
return PvmOutOfRes;
}
tp = task_new(tid);
/* search for file */
eplist = CINDEX(name, '/') ? nullep : epaths;
for (ep = eplist; *ep; ep++) {
(void)strcpy(path, *ep);
if (path[0])
(void)strcat(path, "/");
(void)strncat(path, name, sizeof(path) - strlen(path) - 1);
#ifdef IMA_OS2
strcat(path,".exe"); /* no *.cmd !!! */
#endif
if (stat(path, &sb) == -1
|| ((sb.st_mode & S_IFMT) != S_IFREG)
|| !(sb.st_mode & S_IEXEC)) {
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() stat failed <%s>\n", path);
}
continue;
}
if (taskertid) {
mp = mesg_new(0);
mp->m_tag = SM_STTASK;
mp->m_dst = taskertid;
pkint(mp, tid);
pkint(mp, flags);
pkstr(mp, path);
for (ac = 1; argv[ac]; ac++) ;
pkint(mp, ac);
pkstr(mp, path);
for (i = 1; i < ac; i++)
pkstr(mp, argv[i]);
pkint(mp, nenv + 1);
sprintf(buf, "PVMEPID=%d", nextfakepid);
pkstr(mp, buf);
task_setpid(tp, nextfakepid);
if (++nextfakepid > 20000000)
nextfakepid = 10000000;
for (i = 0; i < nenv; i++)
pkstr(mp, env[i]);
pkint(mp, inst);
pkint(mp, hosttotal);
pkint(mp, outof);
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() info:: inst %d host %d outof %d\n",
inst, hosttotal, outof);
}
wp = wait_new(WT_TASKSTART);
wp->wa_tid = tid;
wp->wa_on = taskertid;
mp->m_wid = wp->wa_wid;
sendmessage(mp);
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() sent tasker t%x pid %d\n",
tp->t_tid, tp->t_pid);
}
} else {
#ifdef IMA_TITN
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pfd) == -1) {
pvmlogperror("forkexec() socketpair");
task_free(tp);
return PvmOutOfRes;
}
#else
if (pipe(pfd) == -1) {
pvmlogperror("forkexec() pipe");
task_free(tp);
return PvmOutOfRes;
}
#endif
/*
* switch runstate to is-task before forking to avoid race.
* if we're killed as a task, we don't want to clean up pvmd stuff.
*/
realrunstate = runstate;
runstate = PVMDISTASK;
#if defined(IMA_CSPP) && defined(BALANCED_SPAWN)
pid = cnx_sc_fork(CNX_INHERIT_SC, (int) __get_node_id());
#else
#ifndef IMA_OS2
pid = fork();
#else
pid = os2_spawn(path,argv,nenv,env,
flags&PvmTaskDebug?debugger:0);
#endif
#endif
if (pid)
runstate = realrunstate;
if (!pid) {
/* close any random fds */
dup2(pfd[1], 1);
dup2(1, 2);
for (i = getdtablesize(); --i > 2; )
(void)close(i);
/*
* set envars
*/
while (nenv-- > 0) {
pvmputenv(env[nenv]);
/*
pvmlogprintf("forkexec() putenv(%s)\n", env[nenv]);
*/
}
/*
* put expected pid in environment for libpvm in case
* the process we exec forks before connecting back to the pvmd
*/
sprintf(buf, "PVMEPID=%d", getpid());
pvmputenv(buf);
argv[0] = path;
if (flags & PvmTaskDebug) {
char *p;
argv--;
if (p = getenv("PVM_DEBUGGER"))
argv[0] = p;
else
argv[0] = debugger;
execv(argv[0], argv);
} else {
execv(path, argv);
}
exit(1);
}
if (pid == -1) {
pvmlogperror("forkexec() fork");
(void)close(pfd[0]);
(void)close(pfd[1]);
task_free(tp);
return PvmOutOfRes;
}
(void)close(pfd[1]);
task_setpid(tp, pid);
tp->t_out = pfd[0];
tp->t_flag |= TF_FORKD;
wrk_fds_add(tp->t_out, 1);
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() new task t%x pid %d pfd=%d\n",
tp->t_tid, tp->t_pid, tp->t_out);
}
}
tp->t_a_out = STRALLOC(name);
*tpp = tp;
return 0;
}
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() didn't find <%s>\n", name);
}
task_free(tp);
return PvmNoFile;
}
#else
extern char **environ;
static int nextfakepid = 10000000; /* XXX fix this */
int *ptr_nfp = &nextfakepid;
int
forkexec(flags, name, argv, nenv, env, tpp)
int flags; /* exec options */
char *name; /* filename */
char **argv; /* arg list (argv[-1] must be there) */
int nenv; /* num of envars */
char **env; /* envars */
struct task **tpp; /* return task context */
{
int tid; /* task tid */
int pid=-1; /* task pid */
struct task *tp; /* new task context */
char *path;
struct stat sb;
char **ep, **eplist;
int i;
struct mesg *mp; /* message to tasker */
struct waitc *wp;
int ac;
char *expected_pid=0;
char buf[32];
HANDLE hpid;
const char *penv=0;
SECURITY_ATTRIBUTES saPipe;
PROCESS_INFORMATION pi;
STARTUPINFO si; /* for CreateProcess call */
char fixedargv[256];
static char *nullep[] = { "", 0 };
path = (char *) malloc (64 * sizeof(char));
if ((tid = tid_new()) < 0) {
pvmlogerror("forkexec() out of tids?\n");
return PvmOutOfRes;
}
tp = task_new(tid);
/* search for file */
_chdir(getenv("PVM_ROOT"));
eplist = CINDEX(name, '/') ? nullep : epaths;
make_valid(name);
for (ep = eplist; *ep; ep++) {
(void)strcpy(path, *ep);
if (path[0])
(void)strcat(path, "/");
(void)strncat(path, name, sizeof(path) - strlen(path) - 1);
if (stat(path, &sb) == -1
|| ((sb.st_mode & S_IFMT) != S_IFREG)
|| !(sb.st_mode & S_IEXEC)) {
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() stat failed <%s>\n", path);
}
continue;
}
/* have found the path with executable */
argv[0] = path;
expected_pid=malloc(64 * sizeof(char));
sprintf(expected_pid, "PVMEPID=%d", nextfakepid);
penv=(const char*) expected_pid;
if (_putenv(penv)) {
pvmlogerror("putenv failed !\n");
exit(1);
}
/* concatenate the argv together, glue it */
if (argv[0]) {
strcpy(fixedargv,argv[0]);
strcat(fixedargv," ");
for (i=1; argv[i];i++) {
strcat(fixedargv,argv[i]);
strcat(fixedargv," ");
}
fixedargv[strlen(fixedargv)-1]=0;
}
if (flags & PvmTaskDebug) {
char *pdebug;
if (pdebug = getenv("PVM_DEBUGGER"))
argv[0] = pdebug;
saPipe.nLength = sizeof(SECURITY_ATTRIBUTES);
saPipe.lpSecurityDescriptor = NULL;
saPipe.bInheritHandle = FALSE;
memset(&si, 0, sizeof(si));
si.cb = sizeof(si);
pid = CreateProcess(
argv[0], /* filename */
fixedargv, /* command line for child */
NULL, /* process security descriptor */
NULL, /* thread security descriptor */
FALSE, /* inherit handles? */
DEBUG_PROCESS, /* creation flags */
NULL, /* inherited environment address */
NULL, /* startup dir */
/* NULL = start in current */
&si, /* ptr to startup info (input) */
&pi); /* ptr to process info (output) */
}
else {
saPipe.nLength = sizeof(SECURITY_ATTRIBUTES);
saPipe.lpSecurityDescriptor = NULL;
saPipe.bInheritHandle = FALSE;
memset(&si, 0, sizeof(si));
si.cb = sizeof(si);
pid = CreateProcess(
argv[0], /* filename */
fixedargv, /* command line for child */
NULL, /* process security descriptor */
NULL, /* thread security descriptor */
FALSE, /* inherit handles? */
DETACHED_PROCESS, /* creation flags */
NULL, /* inherited environment address */
NULL, /* startup dir */
/* NULL = start in current */
&si, /* ptr to startup info (input) */
&pi); /* ptr to process info (output) */
}
if (pid == -1) {
pvmlogperror("forkexec() _spawnve");
task_free(tp);
return PvmOutOfRes;
}
CloseHandle(pi.hThread);
task_sethandle(tp,pi.hProcess);
task_setpid(tp,nextfakepid);
nextfakepid++;
tp->t_flag |= TF_FORKD;
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() new task t%x pid %d pfd=%d\n",
tp->t_tid, tp->t_pid, tp->t_out);
}
tp->t_a_out = STRALLOC(name);
*tpp = tp;
return 0;
} /* for */
if (pvmdebmask & PDMTASK) {
pvmlogprintf("forkexec() didn't find <%s>\n", name);
}
task_free(tp);
return PvmNoFile;
}
/* convenience for pvm programs, just program your executable without */
/* any .exe appendix. added on WIN32 machines automatically. make it */
/* valid */
int make_valid(char *n)
{
char *appendix=".exe";
int lenapp=0;
char *position=0;
int i=strlen(n)-1;
int j;
lenapp= strlen(appendix)-1;
if ((i= i- lenapp) > 0) {
position=n;
for (j=0;j<i;j++) position++;
/* have to append appendix */
if (strncmp(position,appendix,lenapp))
strcat(n,appendix);
} else strcat(n,appendix);
}
#endif
/* beprime()
*
* Pvmd[master] becomes pvmd'[master].
* Set runstate, make ppnetsock the real netsock, close loclsock.
*/
beprime()
{
struct htab *htp;
struct task *tp;
int i;
runstate = PVMDPRIME;
if ((pvmmyupid = getpid()) == -1) {
pvmlogerror("beprime() can't getpid()\n");
pvmbailout(0);
}
myhostpart = 0;
pvmmytid = TIDPVMD;
htp = ht_new(hosts->ht_local);
htp->ht_master = hosts->ht_local;
htp->ht_local = 0;
ht_insert(htp, hosts->ht_hosts[hosts->ht_local]);
ht_insert(htp, hosts->ht_hosts[0]);
htp->ht_hosts[htp->ht_master]->hd_txseq
= htp->ht_hosts[0]->hd_rxseq;
htp->ht_hosts[htp->ht_master]->hd_rxseq
= htp->ht_hosts[0]->hd_txseq;
oldhosts = hosts;
hosts = htp;
#ifndef NOUNIXDOM
loclspath = 0;
#endif
(void)close(loclsock);
loclsock = -1;
loclsnam = 0;
(void)close(netsock);
netsock = ppnetsock;
ppnetsock = -1;
locltasks = 0;
task_init();
/* close everything but netsock, log_fd and 0, 1, 2 */
for (i = getdtablesize(); --i > 2; )
if (i != netsock && i != log_fd)
(void)close(i);
wrk_fds_init();
wrk_fds_add(netsock, 1);
opq = pk_new(0);
opq->pk_tlink = opq->pk_trlink = opq;
wdead = 0;
rdead = 0;
return 0;
}
/* pkt_to_host()
*
* Add data pkt to send queue (txq) for a host. Consume the pkt.
* If data plus header length is greater than host mtu,
* refragment into >1 pkts.
*
* We have to pay special attention to the FFSOM packet - make it
* shorter so there's room to prepend the message header later.
*
* If send window to host has room, push packet to opq.
*/
int
pkt_to_host(hp, pp)
struct hostd *hp;
struct pkt *pp;
{
int maxl = (hp->hd_mtu < pvmudpmtu ? hp->hd_mtu : pvmudpmtu) - DDFRAGHDR;
int llim = pp->pk_flag & FFSOM ? maxl - MSGHDRLEN : maxl;
pp->pk_flag = (pp->pk_flag & (FFSOM|FFEOM)) | FFDAT;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("pkt_to_host() pkt src t%x dst t%x f %s len %d\n",
pp->pk_src, pp->pk_dst, pkt_flags(pp->pk_flag), pp->pk_len);
}
if (pp->pk_len <= llim) {
LISTPUTBEFORE(hp->hd_txq, pp, pk_link, pk_rlink);
} else {
struct pkt *pp2;
char *cp = pp->pk_dat;
int togo;
int n;
int ff = pp->pk_flag & FFSOM;
int fe = pp->pk_flag & FFEOM;
for (togo = pp->pk_len; togo > 0; togo -= n) {
n = min(togo, llim);
if ((pvmdebmask & PDMPACKET) && togo != pp->pk_len) {
pvmlogprintf("pkt_to_host() refrag len %d\n", n);
}
#ifdef STATISTICS
stats.refrag++;
#endif
pp2 = pk_new(0);
pp2->pk_src = pp->pk_src;
pp2->pk_dst = pp->pk_dst;
if (n == togo)
ff |= fe;
pp2->pk_flag = ff | FFDAT;
ff = 0;
llim = maxl;
pp2->pk_enc = pp->pk_enc;
pp2->pk_tag = pp->pk_tag;
pp2->pk_ctx = pp->pk_ctx;
pp2->pk_wid = pp->pk_wid;
pp2->pk_crc = pp->pk_crc;
pp2->pk_buf = pp->pk_buf;
pp2->pk_max = pp->pk_max;
pp2->pk_dat = cp;
pp2->pk_len = n;
da_ref(pp->pk_buf);
cp += n;
LISTPUTBEFORE(hp->hd_txq, pp2, pk_link, pk_rlink);
}
pk_free(pp);
}
while (hp->hd_nop < nopax
&& (hp->hd_txq->pk_link != hp->hd_txq)) {
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("pkt_to_host() pkt to opq\n");
}
pp = hp->hd_txq->pk_link;
LISTDELETE(pp, pk_link, pk_rlink);
TVCLEAR(&pp->pk_rtv);
TVXADDY(&pp->pk_rta, &hp->hd_rtt, &hp->hd_rtt);
TVCLEAR(&pp->pk_rto);
TVCLEAR(&pp->pk_at);
pp->pk_nrt = 0;
pp->pk_hostd = hp;
pp->pk_seq = hp->hd_txseq;
hp->hd_txseq = NEXTSEQNUM(hp->hd_txseq);
pp->pk_ack = 0;
LISTPUTBEFORE(hp->hd_opq, pp, pk_link, pk_rlink);
hp->hd_nop++;
LISTPUTBEFORE(opq, pp, pk_tlink, pk_trlink);
}
return 0;
}
int
fin_to_host(hp)
struct hostd *hp;
{
struct pkt *pp;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("fin_to_host() %s\n", hp->hd_name);
}
pp = pk_new(DDFRAGHDR); /* XXX could reref a dummy databuf here */
pp->pk_dat += DDFRAGHDR;
pp->pk_dst = hp->hd_hostpart | TIDPVMD;
pp->pk_src = pvmmytid;
pp->pk_flag = FFFIN;
TVCLEAR(&pp->pk_rtv);
TVXADDY(&pp->pk_rta, &hp->hd_rtt, &hp->hd_rtt);
TVCLEAR(&pp->pk_rto);
TVCLEAR(&pp->pk_at);
pp->pk_nrt = 0;
pp->pk_hostd = hp;
pp->pk_seq = hp->hd_txseq;
hp->hd_txseq = NEXTSEQNUM(hp->hd_txseq);
pp->pk_ack = 0;
LISTPUTBEFORE(hp->hd_opq, pp, pk_link, pk_rlink);
hp->hd_nop++;
LISTPUTAFTER(opq, pp, pk_tlink, pk_trlink);
return 0;
}
int
finack_to_host(hp)
struct hostd *hp;
{
struct pkt *pp;
if (pvmdebmask & PDMPACKET) {
pvmlogprintf("finack_to_host() %s\n", hp->hd_name);
}
pp = pk_new(DDFRAGHDR); /* XXX could reref a dummy databuf here */
pp->pk_dat += DDFRAGHDR;
pp->pk_dst = hp->hd_hostpart | TIDPVMD;
pp->pk_src = pvmmytid;
pp->pk_flag = FFFIN|FFACK;
TVCLEAR(&pp->pk_rtv);
TVCLEAR(&pp->pk_rta);
TVCLEAR(&pp->pk_rto);
TVCLEAR(&pp->pk_at);
pp->pk_nrt = 0;
pp->pk_hostd = hp;
pp->pk_seq = 0;
pp->pk_ack = 0;
LISTPUTAFTER(opq, pp, pk_tlink, pk_trlink);
return 0;
}
/* pkt_to_task()
*
* Add data pkt to send queue (txq) for a task. Consume the pkt.
* If data plus header length is greater than task mtu,
* refragment into >1 pkts.
*/
int
pkt_to_task(tp, pp)
struct task *tp;
struct pkt *pp;
{
if (tp->t_sock >= 0 && (tp->t_flag & TF_CONN)
#ifdef SHMEM
&& !(tp -> t_flag & TF_SHM) /* don't add socket if a shm task */
#endif
)
wrk_fds_add(tp->t_sock, 2);
#if defined(IMA_PGON) || defined(IMA_I860)
if (TIDISNODE(pp->pk_dst))
mpp_output(tp, pp);
else
#endif
#ifdef SHMEM
if (((tp->t_sock < 0) && (tp->t_flag & TF_SHMCONN))
|| (tp->t_flag & TF_SHMCONN) )
mpp_output(tp, pp);
else
#endif
#ifdef LocalRefragmentTest
if (pp->pk_len + DDFRAGHDR <= pvmudpmtu) {
LISTPUTBEFORE(tp->t_txq, pp, pk_link, pk_rlink);
} else {
struct pkt *pp2;
int maxl = pvmudpmtu - DDFRAGHDR;
char *cp = pp->pk_dat;
int togo;
int n;
int ff = pp->pk_flag & FFSOM;
int fe = pp->pk_flag & FFEOM;
for (togo = pp->pk_len; togo > 0; togo -= n) {
n = min(togo, maxl);
pvmlogprintf("pkt_to_task() refrag len %d\n", n);
pp2 = pk_new(0);
pp2->pk_src = pp->pk_src;
pp2->pk_dst = pp->pk_dst;
if (n == togo)
ff |= fe;
pp2->pk_flag = ff | FFDAT;
ff = 0;
pp2->pk_enc = pp->pk_enc;
pp2->pk_tag = pp->pk_tag;
pp2->pk_ctx = pp->pk_ctx;
pp2->pk_wid = pp->pk_wid;
pp2->pk_crc = pp->pk_crc;
pp2->pk_buf = pp->pk_buf;
pp2->pk_max = pp->pk_max;
pp2->pk_dat = cp;
pp2->pk_len = n;
da_ref(pp->pk_buf);
cp += n;
LISTPUTBEFORE(tp->t_txq, pp2, pk_link, pk_rlink);
}
pk_free(pp);
}
#else /*LocalRefragmentTest*/
{
if (pvmdebmask & PDMMESSAGE)
pvmlogprintf("pkt_to_task: queueing %x \n", pp->pk_dst);
LISTPUTBEFORE(tp->t_txq, pp, pk_link, pk_rlink);
}
#endif /*LocalRefragmentTest*/
return 0;
}
#ifdef STATISTICS
dump_statistics()
{
pvmlogprintf(" select: rdy %d, zero %d, neg %d\n",
stats.selrdy, stats.selzer, stats.selneg);
pvmlogprintf(" sendto: ok %d, neg %d recvfrom: ok %d\n",
stats.sdok, stats.sdneg, stats.rfok);
pvmlogprintf(" read: pos %d, zero %d, neg %d\n",
stats.rdok, stats.rdzer, stats.rdneg);
pvmlogprintf(" write: ok %d, short %d, zero %d, neg %d\n",
stats.wrok, stats.wrshr, stats.wrzer, stats.wrneg);
pvmlogprintf(" refrags: %d\n", stats.refrag);
pvmlogprintf(" netwk resends: %d\n", stats.netret);
return 0;
}
reset_statistics()
{
BZERO((char*)&stats, sizeof(stats));
return 0;
}
#endif /*STATISTICS*/
#if defined(IMA_CSPP) && defined(BALANCED_SPAWN)
static int number_nodes = -1;
static int number_cpus = -1;
static cnx_scid_t scid_num;
static int
__get_node_id()
{
static int current_node = 0;
static int current_cpu = 0;
if (number_nodes == -1) {
number_nodes = get_number_nodes();
number_cpus = get_number_cpus(current_node);
goto done;
}
if (number_nodes == 1) {
goto done;
}
if (current_cpu < (number_cpus - 1) ) {
current_cpu++;
} else {
current_cpu = 0;
if (current_node < (number_nodes - 1)) {
current_node++;
number_cpus = get_number_cpus(current_node);
} else {
current_node = 0;
number_cpus = get_number_cpus(current_node);
if (pvmdebmask & PDMTASK) {
pvmlogerror (
"Warning:pvm_spawn restarting process placement on Node 0");
}
}
}
done:
return current_node;
}
static cnx_is_scnode_basic_info_data_t sc_info[CNX_MAX_NODES];
static int
get_number_nodes()
{
cnx_is_target_data_t target;
int ret;
cnx_pattributes_t pattr;
int val;
char errortxt[128];
cnx_getpattr(getpid(), CNX_PATTR_SCID, &pattr);
scid_num = pattr.pattr_scid;
cnx_sysinfo_target_scnode(&target, scid_num, CNX_IS_ALL_NODES);
ret = cnx_sysinfo(
CNX_IS_SCNODE_BASIC_INFO,
(void *) &target,
sc_info,
CNX_MAX_NODES,
CNX_IS_SCNODE_BASIC_INFO_COUNT,
(unsigned *) &val);
if (ret == -1) {
sprintf(errortxt,
"Error calling cnx_sysinfo in %s:line %d errno: %d \n",
__FILE__, __LINE__, errno);
pvmlogerror(errortxt);
exit (-1);
}
return val;
}
static int
get_number_cpus(int current_node)
{
return sc_info[current_node].num_cpus;
}
#endif /*defined(IMA_CSPP) && defined(BALANCED_SPAWN)*/
/* mksocs()
*
* Make UDP sockets netsock and ppnetsock. Make TCP master socket
* loclsock.
*
* Returns 0 if ok,
* else 2 if pvmd already running,
* else 1.
*/
int
mksocs()
{
struct hostd *hp = hosts->ht_hosts[hosts->ht_local];
struct hostd *hp0 = hosts->ht_hosts[0];
struct sockaddr_in sin;
char buf[128];
char *sfn;
#ifndef WIN32
int d;
#else
HANDLE d;
int e;
#endif
#ifndef NOSOCKOPT
int bsz;
#endif
char *p;
#ifdef SOCKLENISUINT
size_t oslen;
#else
int oslen;
#endif
int cc;
#ifndef NOUNIXDOM
char spath[LEN_OF_TMP_NAM]; /* local socket path */
struct sockaddr_un uns;
#endif
/*
* make pvmd-pvmd socket
*/
if ((netsock = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
pvmlogperror("mksocs() socket netsock");
return 1;
}
hp->hd_sad.sin_port = 0;
if (bind(netsock, (struct sockaddr*)&hp->hd_sad, sizeof(hp->hd_sad)) == -1)
{
pvmlogperror("mksocs() bind netsock");
return 1;
}
oslen = sizeof(hp->hd_sad);
if (getsockname(netsock, (struct sockaddr*)&hp->hd_sad, &oslen) == -1) {
pvmlogperror("mksocs() getsockname netsock");
return 1;
}
/*
* make pvmd-pvmd' socket
*/
if ((ppnetsock = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
pvmlogperror("mksocs() socket ppnetsock");
return 1;
}
hp0->hd_sad.sin_port = 0;
if (bind(ppnetsock, (struct sockaddr*)&hp0->hd_sad, sizeof(hp0->hd_sad))
== -1) {
pvmlogperror("mksocs() bind ppnetsock");
return 1;
}
oslen = sizeof(hp0->hd_sad);
if (getsockname(ppnetsock, (struct sockaddr*)&hp0->hd_sad, &oslen) == -1) {
pvmlogperror("mksocs() getsockname ppnetsock");
return 1;
}
/*
* make pvmd-local task socket
*/
#ifdef NOUNIXDOM
if ((loclsock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
pvmlogperror("mksocs() socket loclsock");
return 1;
}
/*
* first try localhost address (loopback) then regular address
* XXX 127.0.0.1 is a hack, we should really gethostbyaddr()
*/
BZERO((char*)&sin, sizeof(sin));
#ifdef IMA_SP2MPI
sin = hp->hd_sad; /* allow task to connect from a node */
#else
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(0x7f000001);
sin.sin_port = 0;
#endif
if (bind(loclsock, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
sin = hp->hd_sad;
if (bind(loclsock, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
pvmlogperror("mksocs() bind loclsock");
return 1;
}
}
oslen = sizeof(sin);
if (getsockname(loclsock, (struct sockaddr*)&sin, &oslen) == -1) {
pvmlogperror("mksocs() getsockname loclsock");
return 1;
}
if (listen(loclsock, SOMAXCONN) == -1) {
pvmlogperror("mksocs() listen loclsock");
return 1;
}
#ifndef NOSOCKOPT
bsz = pvmudpmtu * 2;
if (setsockopt(netsock, SOL_SOCKET, SO_SNDBUF,
(char*)&bsz, sizeof(bsz)) == -1
|| setsockopt(netsock, SOL_SOCKET, SO_RCVBUF,
(char*)&bsz, sizeof(bsz)) == -1
|| setsockopt(ppnetsock, SOL_SOCKET, SO_SNDBUF,
(char*)&bsz, sizeof(bsz)) == -1
|| setsockopt(ppnetsock, SOL_SOCKET, SO_RCVBUF,
(char*)&bsz, sizeof(bsz)) == -1) {
pvmlogperror("mksocs() setsockopt");
return 1;
}
#endif /*NOSOCKOPT*/
p = inadport_hex(&sin);
#else /*NOUNIXDOM*/
if ((loclsock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
pvmlogperror("mksocs() socket loclsock");
return 1;
}
BZERO((char*)&uns, sizeof(uns));
uns.sun_family = AF_UNIX;
spath[0] = 0;
(void)TMPNAMFUN(spath);
strcpy(uns.sun_path, spath);
/*
XXX len?
*/
if (bind(loclsock, (struct sockaddr*)&uns, sizeof(uns)) == -1) {
pvmlogperror("mksocs() bind loclsock");
return 1;
}
if (listen(loclsock, SOMAXCONN) == -1) {
pvmlogperror("mksocs() listen loclsock");
return 1;
}
loclspath = STRALLOC(spath);
p = spath;
#endif /*NOUNIXDOM*/
/*
* make pvmd-local task socket address file
*/
if (!(sfn = pvmdsockfile())) {
pvmlogerror("mksocs() pvmdsockfile() failed\n");
pvmbailout(0);
}
#ifndef WIN32
if ((d = open(sfn, O_CREAT|O_EXCL|O_WRONLY|O_TRUNC, 0600)) == -1) {
if (errno == EEXIST) {
#else
if (((HANDLE) d = win32_create_file(sfn, CREATE_NEW))
== (HANDLE) -2) {
/* this code is for WIN95 */
system_loser_win = TRUE;
e = _open(sfn,O_CREAT|O_EXCL|O_WRONLY|O_TRUNC, 0600);
}
if ((d == (HANDLE) -1) || (e ==-1)) {
if (1) { /* errno == EEXIST */
#endif
#ifndef OVERLOADHOST
(void)pvmlogprintf(
"mksocs() %s exists. pvmd already running?\n", sfn);
return 2;
#endif
} else {
pvmlogperror(sfn);
pvmlogerror("mksocs() can't write address file\n");
return 1;
}
} else {
#ifndef WIN32
cc = write(d, p, strlen(p));
#else
CloseHandle(d);
if (!system_loser_win) {
if ((d = (HANDLE) win32_open_file(sfn)) == (HANDLE) -1) {
pvmlogprintf("could not open file %s: %d \n",
sfn,GetLastError());
return 2;
}
cc = win32_write_file(d,p,strlen(p));
} else
/* the win95 case */
cc = _write(e,p,strlen(p));
#endif
if (cc != strlen(p)) {
if (cc == -1) {
pvmlogperror(sfn);
pvmlogerror("mksocs() can't write address file\n");
} else {
(void)pvmlogprintf(
"mksocs() aargh, short write on %s: %d\n",
sfn, cc);
pvmlogerror("mksocs() is the partition full?\n");
}
#ifndef WIN32
(void)close(d);
#else
if (d)
CloseHandle(d);
if (e)
close(e);
#endif
(void)unlink(sfn);
return 1;
}
loclsnam = STRALLOC(sfn);
#ifndef WIN32
(void)close(d);
#else
if (d)
win32_close_file(d);
if (e)
close(e);
#endif
}
/* set PVMSOCK envar */
sprintf(buf, "PVMSOCK=%s", p);
p = STRALLOC(buf);
pvmputenv(p);
return 0;
}
/* colonsep()
*
* Break string into substrings on ':' delimiter.
* Return null-terminated array of strings, in new malloc'd space.
* Modifies the original string.
*/
char **
colonsep(s)
char *s; /* the string to break up */
{
char **els;
int nel = 2; /* length of els */
char *p, *q;
#ifdef IMA_OS2
for (p = s; p = CINDEX(p, ';'); p++)
#else
for (p = s; p = CINDEX(p, ':'); p++)
#endif
nel++;
els = TALLOC(nel, char*, "path");
nel = 0;
for (p = s; p; p = q) {
#ifdef IMA_OS2
if (q = CINDEX(p, ';'))
#else
if (q = CINDEX(p, ':'))
#endif
*q++ = 0;
els[nel++] = p;
}
els[nel] = 0;
return els;
}
/* varsub()
*
* Substitute environment variables into string.
* Variables named by $NAME or ${NAME}.
* Return string in new malloc'd space.
*/
char *
varsub(s)
char *s;
{
int rm = 8; /* length of result string space */
char *r; /* result string */
int rl = 0;
char *p;
char *vn, *vv;
char c;
int l;
r = TALLOC(rm, char, "var");
while (*s) {
for (p = s; *p && *p != '$'; p++) ;
if (l = p - s) {
if (rl + l >= rm) {
rm = rl + l + 1;
r = TREALLOC(r, rm, char);
}
strncpy(r + rl, s, l);
rl += l;
}
s = p++;
if (*s == '$') {
if (*p == '{')
p++;
vn = p;
while (isalnum(*p) || *p == '_')
p++;
c = *p;
*p = 0;
vv = getenv(vn);
*p = c;
if (*p == '}')
p++;
if (vv)
l = strlen(vv);
else {
vv = s;
l = p - s;
}
if (l) {
if (rl + l >= rm) {
rm = rl + l + 1;
r = TREALLOC(r, rm, char);
}
strncpy(r + rl, vv, l);
rl += l;
}
s = p;
}
}
r[rl] = 0;
return r;
}
/* crunchzap()
*
* Parse a string into words delimited by <> pairs.
* Max number of words is original value of *acp.
*
* Trashes out the original string.
* Returns 0 with av[0]..av[*acp - 1] pointing to the words.
* Returns 1 if too many words.
*/
int
crunchzap(s, acp, av)
char *s; /* the string to parse */
int *acp; /* max words in, ac out */
char **av; /* pointers to args */
{
register int ac;
register char *p = s;
register int n = *acp;
/* separate out words of command */
ac = 0;
while (*p) {
while (*p && *p++ != '<');
if (*p) {
if (ac >= n) {
/* command too long */
*acp = ac;
return 1;
}
av[ac++] = p;
while (*p && *p != '>') p++;
if (*p) *p++ = 0;
}
}
*acp = ac;
return 0;
}
/* master_config()
*
* Master pvmd. Config a host table with length 1.
*/
int
master_config(hn, argc, argv)
char *hn; /* hostname or null */
int argc;
char **argv;
{
struct hostent *he;
struct hostd *hp;
struct hostd *hp2;
int i;
char *s;
if (argc > 2) {
pvmlogerror("usage: pvmd3 [-ddebugmask] [-nhostname] [hostfile]\n");
pvmbailout(0);
}
if (argc == 2) {
filehosts = readhostfile(argv[1]);
}
if (pvmdebmask & PDMSTARTUP) {
if (filehosts) {
pvmlogerror("master_config() host file:\n");
ht_dump(filehosts);
} else
pvmlogerror("master_config() null host file\n");
}
hosts = ht_new(1);
hosts->ht_serial = 1;
hosts->ht_master = 1;
hosts->ht_cons = 1;
hosts->ht_local = 1;
hp = hd_new(1);
hp->hd_name = STRALLOC(hn);
hp->hd_arch = STRALLOC(myarchname);
hp->hd_mtu = pvmudpmtu;
hp->hd_dsig = pvmmydsig;
ht_insert(hosts, hp);
hd_unref(hp);
hp = hd_new(0);
hp->hd_name = STRALLOC("pvmd'");
hp->hd_arch = STRALLOC(myarchname);
hp->hd_mtu = pvmudpmtu;
hp->hd_dsig = pvmmydsig;
ht_insert(hosts, hp);
hd_unref(hp);
/*
* get attributes from host file if available
*/
hp = hosts->ht_hosts[1];
if (filehosts &&
((hp2 = nametohost(filehosts, hp->hd_name))
|| (hp2 = filehosts->ht_hosts[0]))) {
applydefaults(hp, hp2);
}
if (!hp->hd_epath) {
if ((s = getenv("PVM_PATH")))
hp->hd_epath = STRALLOC(s);
else
hp->hd_epath = STRALLOC(DEFBINDIR);
}
epaths = colonsep(varsub(hp->hd_epath));
if (!hp->hd_bpath)
hp->hd_bpath = STRALLOC(DEFDEBUGGER);
debugger = varsub(hp->hd_bpath);
if (!hp->hd_wdir) {
if ((s = getenv("PVM_WD")))
hp->hd_wdir = STRALLOC(s);
else
hp->hd_wdir = STRALLOC(pvmgethome());
}
s = varsub(hp->hd_wdir);
if (chdir(s) == -1)
pvmlogperror(s);
PVM_FREE(s);
if (!(he = gethostbyname(hp->hd_aname ? hp->hd_aname : hp->hd_name))) {
pvmlogprintf("master_config() %s: can't gethostbyname\n", hn);
pvmbailout(0);
}
BCOPY(he->h_addr_list[0], (char*)&hp->hd_sad.sin_addr,
sizeof(struct in_addr));
hp = hosts->ht_hosts[0];
BCOPY(he->h_addr_list[0], (char*)&hp->hd_sad.sin_addr,
sizeof(struct in_addr));
if (pvmdebmask & (PDMHOST|PDMSTARTUP)) {
pvmlogerror("master_config() host table:\n");
ht_dump(hosts);
}
if (mksocs())
pvmbailout(0);
#ifndef WIN32
/* close everything but our sockets */
for (i = getdtablesize(); --i > 2; )
/* XXX don't like this - hard to maintain */
if (i != netsock && i != ppnetsock && i != loclsock && i != log_fd)
(void)close(i);
/* reopen 0, 1, 2*/
(void)open("/dev/null", O_RDONLY, 0);
(void)open("/dev/null", O_WRONLY, 0);
(void)dup2(1, 2);
#endif
pvmsetlog(2);
runstate = PVMDNORMAL;
return 0;
}
/* slave_config()
*
* Slave pvmd being started by master. Trade minimal config info
* so we can send packets back and forth.
*/
int
slave_config(hn, argc, argv)
char *hn;
int argc;
char **argv;
{
int lh; /* local host index */
int mh; /* master host index */
struct hostd *hp;
int i, j;
int ac;
int ms = 0; /* manual (humanoid) startup */
#ifndef WIN32
int dof = 1; /* fork, exit parent (default) */
#else
int dof = 0;
#endif
int bad = 0;
char *p;
char *s;
for (i = j = ac = 1; i < argc; i++) {
if (argv[i][0] == '-') {
switch (argv[i][1]) {
case 'S':
ms = 1;
break;
case 'f':
dof = 0;
break;
default:
pvmlogprintf("slave_config() unknown switch: %s\n", argv[i]);
bad++;
}
} else {
argv[j++] = argv[i];
ac++;
}
}
argc = ac;
if (bad || argc != 6) {
pvmlogerror("slave_config: bad args\n");
pvmbailout(0);
}
mh = atoi(argv[1]);
lh = atoi(argv[4]);
hosts = ht_new(1);
hosts->ht_serial = 1;
hosts->ht_master = mh;
hosts->ht_cons = mh;
hosts->ht_local = lh;
hp = hd_new(mh);
hp->hd_name = STRALLOC("?");
hex_inadport(argv[2], &hp->hd_sad);
hp->hd_mtu = atoi(argv[3]);
ht_insert(hosts, hp);
hd_unref(hp);
hp = hd_new(0);
hp->hd_name = STRALLOC("pvmd'");
hp->hd_arch = STRALLOC(myarchname);
hp->hd_mtu = pvmudpmtu;
hp->hd_dsig = pvmmydsig;
hex_inadport(argv[5], &hp->hd_sad);
ht_insert(hosts, hp);
hd_unref(hp);
hp = hd_new(lh);
hp->hd_name = STRALLOC(hn);
hp->hd_arch = STRALLOC(myarchname);
hp->hd_mtu = pvmudpmtu;
hp->hd_dsig = pvmmydsig;
hex_inadport(argv[5], &hp->hd_sad);
ht_insert(hosts, hp);
hd_unref(hp);
if (i = mksocs()) {
if (i == 2) {
printf("PvmDupHost\n");
fflush(stdout);
}
pvmbailout(0);
}
printf("ddpro<%d> arch<%s> ip<%s> mtu<%d> dsig<%d>\n",
DDPROTOCOL,
myarchname,
inadport_hex(&hp->hd_sad),
pvmudpmtu,
pvmmydsig);
fflush(stdout);
if (!ms)
#ifndef WIN32
#ifdef IMA_OS2
/* --- */
#else
(void)read(0, (char*)&i, 1);
#endif
#else
(void)_read(0, (char*)&i, 1);
#endif
#ifndef WIN32
if (dof) {
if (i = fork()) {
if (i == -1)
pvmlogperror("slave_config() fork");
exit(0);
}
/* close everything but our sockets */
for (i = getdtablesize(); --i >= 0; )
/* XXX don't like this - hard to maintain */
if (i != netsock && i != loclsock && i != log_fd)
(void)close(i);
}
/* reopen 0, 1, 2*/
(void)open("/dev/null", O_RDONLY, 0);
(void)open("/dev/null", O_WRONLY, 0);
(void)dup2(1, 2);
#else
fprintf (stderr,"Sorry, no support for WIN32 \n");
exit(1);
#endif
pvmsetlog(2);
if ((p = getenv("PVM_PATH")))
s = STRALLOC(p);
else
s = STRALLOC(DEFBINDIR);
epaths = colonsep(varsub(s));
PVM_FREE(s);
s = STRALLOC(DEFDEBUGGER);
debugger = varsub(s);
PVM_FREE(s);
if ((s = getenv("PVM_WD")))
p = STRALLOC(s);
else
p = STRALLOC(pvmgethome());
s = varsub(p);
if (chdir(s) == -1)
pvmlogperror(s);
PVM_FREE(p);
PVM_FREE(s);
runstate = PVMDSTARTUP;
return 0;
}